From 1ffb1f2b66253ae52d3990e6449f251168b7ea9b Mon Sep 17 00:00:00 2001 From: Rob Pike Date: Mon, 18 Oct 2010 15:09:43 -0700 Subject: [PATCH] netchan: add new method Hangup to terminate transmission on a channel Fixes #1151. R=rsc CC=golang-dev https://golang.org/cl/2469043 --- src/pkg/netchan/export.go | 16 +++++++ src/pkg/netchan/import.go | 16 +++++++ src/pkg/netchan/netchan_test.go | 76 ++++++++++++++++++++++++++++++++- 3 files changed, 107 insertions(+), 1 deletion(-) diff --git a/src/pkg/netchan/export.go b/src/pkg/netchan/export.go index 642fc6596b..318b865b0b 100644 --- a/src/pkg/netchan/export.go +++ b/src/pkg/netchan/export.go @@ -346,3 +346,19 @@ func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error { exp.chans[name] = &chanDir{ch, dir} return nil } + +// Hangup disassociates the named channel from the Exporter and closes +// the channel. Messages in flight for the channel may be dropped. +func (exp *Exporter) Hangup(name string) os.Error { + exp.mu.Lock() + chDir, ok := exp.chans[name] + if ok { + exp.chans[name] = nil, false + } + exp.mu.Unlock() + if !ok { + return os.ErrorString("netchan export: hangup: no such channel: " + name) + } + chDir.ch.Close() + return nil +} diff --git a/src/pkg/netchan/import.go b/src/pkg/netchan/import.go index 034cc2f9d4..eef8e9397c 100644 --- a/src/pkg/netchan/import.go +++ b/src/pkg/netchan/import.go @@ -203,3 +203,19 @@ func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, n int) } return nil } + +// Hangup disassociates the named channel from the Importer and closes +// the channel. Messages in flight for the channel may be dropped. +func (imp *Importer) Hangup(name string) os.Error { + imp.chanLock.Lock() + chDir, ok := imp.chans[name] + if ok { + imp.chans[name] = nil, false + } + imp.chanLock.Unlock() + if !ok { + return os.ErrorString("netchan import: hangup: no such channel: " + name) + } + chDir.ch.Close() + return nil +} diff --git a/src/pkg/netchan/netchan_test.go b/src/pkg/netchan/netchan_test.go index 6efb8c5d74..707111a094 100644 --- a/src/pkg/netchan/netchan_test.go +++ b/src/pkg/netchan/netchan_test.go @@ -230,12 +230,86 @@ func TestExportSync(t *testing.T) { <-done } +// Test hanging up the send side of an export. +// TODO: test hanging up the receive side of an export. +func TestExportHangup(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + ech := make(chan int) + err = exp.Export("exportedSend", ech, Send) + if err != nil { + t.Fatal("export:", err) + } + // Prepare to receive two values. We'll actually deliver only one. + ich := make(chan int) + err = imp.ImportNValues("exportedSend", ich, Recv, 2) + if err != nil { + t.Fatal("import exportedSend:", err) + } + // Send one value, receive it. + const Value = 1234 + ech <- Value + v := <-ich + if v != Value { + t.Fatal("expected", Value, "got", v) + } + // Now hang up the channel. Importer should see it close. + exp.Hangup("exportedSend") + v = <-ich + if !closed(ich) { + t.Fatal("expected channel to be closed; got value", v) + } +} + +// Test hanging up the send side of an import. +// TODO: test hanging up the receive side of an import. +func TestImportHangup(t *testing.T) { + exp, err := NewExporter("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + ech := make(chan int) + err = exp.Export("exportedRecv", ech, Recv) + if err != nil { + t.Fatal("export:", err) + } + // Prepare to Send two values. We'll actually deliver only one. + ich := make(chan int) + err = imp.ImportNValues("exportedRecv", ich, Send, 2) + if err != nil { + t.Fatal("import exportedRecv:", err) + } + // Send one value, receive it. + const Value = 1234 + ich <- Value + v := <-ech + if v != Value { + t.Fatal("expected", Value, "got", v) + } + // Now hang up the channel. Exporter should see it close. + imp.Hangup("exportedRecv") + v = <-ech + if !closed(ech) { + t.Fatal("expected channel to be closed; got value", v) + } +} + +// This test cross-connects a pair of exporter/importer pairs. type value struct { i int source string } -// This test cross-connects a pair of exporter/importer pairs. func TestCrossConnect(t *testing.T) { e1, err := NewExporter("tcp", "127.0.0.1:0") if err != nil {