From 4d45dd3268cdb2a1aa2225b13ea957bca899ed6b Mon Sep 17 00:00:00 2001 From: Rob Pike Date: Wed, 20 Jan 2010 14:12:29 +1100 Subject: [PATCH] first part of networked channels. limitations: poor error handling teardown not done exporter must send, importer must receive testing is rudimentary at best R=rsc CC=golang-dev https://golang.org/cl/186234 --- src/pkg/netchan/Makefile | 13 ++ src/pkg/netchan/common.go | 63 ++++++++++ src/pkg/netchan/export.go | 211 ++++++++++++++++++++++++++++++++ src/pkg/netchan/import.go | 149 ++++++++++++++++++++++ src/pkg/netchan/netchan_test.go | 50 ++++++++ 5 files changed, 486 insertions(+) create mode 100644 src/pkg/netchan/Makefile create mode 100644 src/pkg/netchan/common.go create mode 100644 src/pkg/netchan/export.go create mode 100644 src/pkg/netchan/import.go create mode 100644 src/pkg/netchan/netchan_test.go diff --git a/src/pkg/netchan/Makefile b/src/pkg/netchan/Makefile new file mode 100644 index 00000000000..a8a5c6a3cab --- /dev/null +++ b/src/pkg/netchan/Makefile @@ -0,0 +1,13 @@ +# Copyright 2009 The Go Authors. All rights reserved. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + +include ../../Make.$(GOARCH) + +TARG=netchan +GOFILES=\ + common.go\ + export.go\ + import.go\ + +include ../../Make.pkg diff --git a/src/pkg/netchan/common.go b/src/pkg/netchan/common.go new file mode 100644 index 00000000000..a82bd07c16b --- /dev/null +++ b/src/pkg/netchan/common.go @@ -0,0 +1,63 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package netchan + +import ( + "gob" + "log" + "net" + "os" + "sync" +) + +type Dir int + +const ( + Recv Dir = iota + Send +) + +// Mutex-protected encoder and decoder pair + +type encDec struct { + decLock sync.Mutex + dec *gob.Decoder + encLock sync.Mutex + enc *gob.Encoder +} + +func newEncDec(conn net.Conn) *encDec { + return &encDec{ + dec: gob.NewDecoder(conn), + enc: gob.NewEncoder(conn), + } +} + +func (ed *encDec) decode(e interface{}) os.Error { + ed.decLock.Lock() + defer ed.decLock.Unlock() + err := ed.dec.Decode(e) + if err != nil { + log.Stderr("exporter decode:", err) + // TODO: tear down connection + return err + } + return nil +} + +func (ed *encDec) encode(e0, e1 interface{}) os.Error { + ed.encLock.Lock() + defer ed.encLock.Unlock() + err := ed.enc.Encode(e0) + if err == nil && e1 != nil { + err = ed.enc.Encode(e1) + } + if err != nil { + log.Stderr("exporter encode:", err) + // TODO: tear down connection? + return err + } + return nil +} diff --git a/src/pkg/netchan/export.go b/src/pkg/netchan/export.go new file mode 100644 index 00000000000..2bbebc951af --- /dev/null +++ b/src/pkg/netchan/export.go @@ -0,0 +1,211 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +/* + The netchan package implements type-safe networked channels: + it allows the two ends of a channel to appear on different + computers connected by a network. It does this by transporting + data sent to a channel on one machine so it can be recovered + by a receive of a channel of the same type on the other. + + An exporter publishes a set of channels by name. An importer + connects to the exporting machine and imports the channels + by name. After importing the channels, the two machines can + use the channels in the usual way. + + Networked channels are not synchronized; they always behave + as if there is a buffer of at least one element between the + two machines. + + TODO: at the moment, the exporting machine must send and + the importing machine must receive. This restriction will + be lifted soon. +*/ +package netchan + +import ( + "log" + "net" + "os" + "reflect" + "sync" +) + +// Export + +// A channel and its associated information: a direction +type exportChan struct { + ch *reflect.ChanValue + dir Dir +} + +// An Exporter allows a set of channels to be published on a single +// network port. A single machine may have multiple Exporters +// but they must use different ports. +type Exporter struct { + listener net.Listener + chanLock sync.Mutex // protects access to channel map + chans map[string]*exportChan +} + +type expClient struct { + *encDec + exp *Exporter +} + +func newClient(exp *Exporter, conn net.Conn) *expClient { + client := new(expClient) + client.exp = exp + client.encDec = newEncDec(conn) + return client + +} + +// TODO: ASSUMES EXPORT MEANS SEND + +// Sent once per channel from importer to exporter to report that it's listening to a channel +type request struct { + name string + dir Dir + count int +} + +// Reply to request, sent from exporter to importer on each send. +type response struct { + name string + error string +} + +// Wait for incoming connections, start a new runner for each +func (exp *Exporter) listen() { + for { + conn, err := exp.listener.Accept() + if err != nil { + log.Stderr("exporter.listen:", err) + break + } + log.Stderr("accepted call from", conn.RemoteAddr()) + client := newClient(exp, conn) + go client.run() + } +} + +// Send a single client all its data. For each request, this will launch +// a serveRecv goroutine to deliver the data for that channel. +func (client *expClient) run() { + req := new(request) + for { + if err := client.decode(req); err != nil { + log.Stderr("error decoding client request:", err) + // TODO: tear down connection + break + } + log.Stderrf("export request: %+v", req) + if req.dir == Recv { + go client.serveRecv(req) + } else { + log.Stderr("export request: can't handle channel direction", req.dir) + resp := new(response) + resp.name = req.name + resp.error = "export request: can't handle channel direction" + client.encode(resp, nil) + break + } + } +} + +// Send all the data on a single channel to a client asking for a Recv +func (client *expClient) serveRecv(req *request) { + exp := client.exp + resp := new(response) + resp.name = req.name + var ok bool + exp.chanLock.Lock() + ech, ok := exp.chans[req.name] + exp.chanLock.Unlock() + if !ok { + resp.error = "no such channel: " + req.name + log.Stderr("export:", resp.error) + client.encode(resp, nil) // ignore any encode error, hope client gets it + return + } + for { + if ech.dir != Send { + log.Stderr("TODO: recv export unimplemented") + break + } + val := ech.ch.Recv() + if err := client.encode(resp, val.Interface()); err != nil { + log.Stderr("error encoding client response:", err) + break + } + if req.count > 0 { + req.count-- + if req.count == 0 { + break + } + } + } +} + +// NewExporter creates a new Exporter to export channels +// on the network and local address defined as in net.Listen. +func NewExporter(network, localaddr string) (*Exporter, os.Error) { + listener, err := net.Listen(network, localaddr) + if err != nil { + return nil, err + } + e := &Exporter{ + listener: listener, + chans: make(map[string]*exportChan), + } + go e.listen() + return e, nil +} + +// Addr returns the Exporter's local network address. +func (exp *Exporter) Addr() net.Addr { return exp.listener.Addr() } + +func checkChan(chT interface{}, dir Dir) (*reflect.ChanValue, os.Error) { + chanType, ok := reflect.Typeof(chT).(*reflect.ChanType) + if !ok { + return nil, os.ErrorString("not a channel") + } + if dir != Send && dir != Recv { + return nil, os.ErrorString("unknown channel direction") + } + switch chanType.Dir() { + case reflect.BothDir: + case reflect.SendDir: + if dir != Recv { + return nil, os.ErrorString("to import/export with Send, must provide <-chan") + } + case reflect.RecvDir: + if dir != Send { + return nil, os.ErrorString("to import/export with Recv, must provide chan<-") + } + } + return reflect.NewValue(chT).(*reflect.ChanValue), nil +} + +// Export exports a channel of a given type and specified direction. The +// channel to be exported is provided in the call and may be of arbitrary +// channel type. +// Despite the literal signature, the effective signature is +// Export(name string, chT chan T, dir Dir) +// where T must be a struct, pointer to struct, etc. +func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error { + ch, err := checkChan(chT, dir) + if err != nil { + return err + } + exp.chanLock.Lock() + defer exp.chanLock.Unlock() + _, present := exp.chans[name] + if present { + return os.ErrorString("channel name already being exported:" + name) + } + exp.chans[name] = &exportChan{ch, dir} + return nil +} diff --git a/src/pkg/netchan/import.go b/src/pkg/netchan/import.go new file mode 100644 index 00000000000..263ee4404b9 --- /dev/null +++ b/src/pkg/netchan/import.go @@ -0,0 +1,149 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package netchan + +import ( + "log" + "net" + "os" + "reflect" + "sync" +) + +// Import + +// A channel and its associated information: a template value, direction and a count +type importChan struct { + ch *reflect.ChanValue + dir Dir + ptr *reflect.PtrValue // a pointer value we can point at each new item + count int +} + +// An Importer allows a set of channels to be imported from a single +// remote machine/network port. A machine may have multiple +// importers, even from the same machine/network port. +type Importer struct { + *encDec + conn net.Conn + chanLock sync.Mutex // protects access to channel map + chans map[string]*importChan +} + +// TODO: ASSUMES IMPORT MEANS RECEIVE + +// NewImporter creates a new Importer object to import channels +// from an Exporter at the network and remote address as defined in net.Dial. +// The Exporter must be available and serving when the Importer is +// created. +func NewImporter(network, remoteaddr string) (*Importer, os.Error) { + conn, err := net.Dial(network, "", remoteaddr) + if err != nil { + return nil, err + } + imp := new(Importer) + imp.encDec = newEncDec(conn) + imp.conn = conn + imp.chans = make(map[string]*importChan) + go imp.run() + return imp, nil +} + +// Handle the data from a single imported data stream, which will +// have the form +// (response, data)* +// The response identifies by name which channel is receiving data. +// TODO: allow an importer to send. +func (imp *Importer) run() { + // Loop on responses; requests are sent by ImportNValues() + resp := new(response) + for { + if err := imp.decode(resp); err != nil { + log.Stderr("importer response decode:", err) + break + } + if resp.error != "" { + log.Stderr("importer response error:", resp.error) + // TODO: tear down connection + break + } + imp.chanLock.Lock() + ich, ok := imp.chans[resp.name] + imp.chanLock.Unlock() + if !ok { + log.Stderr("unknown name in request:", resp.name) + break + } + if ich.dir != Recv { + log.Stderr("TODO: import send unimplemented") + break + } + // Create a new value for each received item. + val := reflect.MakeZero(ich.ptr.Type().(*reflect.PtrType).Elem()) + ich.ptr.PointTo(val) + if err := imp.decode(ich.ptr.Interface()); err != nil { + log.Stderr("importer value decode:", err) + return + } + ich.ch.Send(val) + } +} + +// Import imports a channel of the given type and specified direction. +// It is equivalent to ImportNValues with a count of 0, meaning unbounded. +func (imp *Importer) Import(name string, chT interface{}, dir Dir, pT interface{}) os.Error { + return imp.ImportNValues(name, chT, dir, pT, 0) +} + +// ImportNValues imports a channel of the given type and specified direction +// and then receives or transmits up to n values on that channel. A value of +// n==0 implies an unbounded number of values. The channel to be bound to +// the remote site's channel is provided in the call and may be of arbitrary +// channel type. +// Despite the literal signature, the effective signature is +// ImportNValues(name string, chT chan T, dir Dir, pT T) +// where T must be a struct, pointer to struct, etc. pT may be more indirect +// than the value type of the channel (e.g. chan T, pT *T) but it must be a +// pointer. +// Example usage: +// imp, err := NewImporter("tcp", "netchanserver.mydomain.com:1234") +// if err != nil { log.Exit(err) } +// ch := make(chan myType) +// err := imp.ImportNValues("name", ch, Recv, new(myType), 1) +// if err != nil { log.Exit(err) } +// fmt.Printf("%+v\n", <-ch) +// (TODO: Can we eliminate the need for pT?) +func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, pT interface{}, n int) os.Error { + ch, err := checkChan(chT, dir) + if err != nil { + return err + } + // Make sure pT is a pointer (to a pointer...) to a struct. + rt := reflect.Typeof(pT) + if _, ok := rt.(*reflect.PtrType); !ok { + return os.ErrorString("not a pointer:" + rt.String()) + } + if _, ok := reflect.Indirect(reflect.NewValue(pT)).(*reflect.StructValue); !ok { + return os.ErrorString("not a pointer to a struct:" + rt.String()) + } + imp.chanLock.Lock() + defer imp.chanLock.Unlock() + _, present := imp.chans[name] + if present { + return os.ErrorString("channel name already being imported:" + name) + } + ptr := reflect.MakeZero(reflect.Typeof(pT)).(*reflect.PtrValue) + imp.chans[name] = &importChan{ch, dir, ptr, n} + // Tell the other side about this channel. + req := new(request) + req.name = name + req.dir = dir + req.count = n + if err := imp.encode(req, nil); err != nil { + log.Stderr("importer request encode:", err) + return err + } + return nil +} diff --git a/src/pkg/netchan/netchan_test.go b/src/pkg/netchan/netchan_test.go new file mode 100644 index 00000000000..b7fd100cf5e --- /dev/null +++ b/src/pkg/netchan/netchan_test.go @@ -0,0 +1,50 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package netchan + +import ( + "fmt" + "testing" +) + +type value struct { + i int + s string +} + +func exportSend(exp *Exporter, t *testing.T) { + c := make(chan value) + err := exp.Export("name", c, Send) + if err != nil { + t.Fatal("export:", err) + } + c <- value{23, "hello"} +} + +func importReceive(imp *Importer, t *testing.T) { + ch := make(chan value) + err := imp.ImportNValues("name", ch, Recv, new(value), 1) + if err != nil { + t.Fatal("import:", err) + } + v := <-ch + fmt.Printf("%v\n", v) + if v.i != 23 || v.s != "hello" { + t.Errorf("bad value: expected 23, hello; got %+v\n", v) + } +} + +func TestBabyStep(t *testing.T) { + exp, err := NewExporter("tcp", ":0") + if err != nil { + t.Fatal("new exporter:", err) + } + go exportSend(exp, t) + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + importReceive(imp, t) +}