diff --git a/src/lib/net/dnsclient.go b/src/lib/net/dnsclient.go index e84d4dcfffc..c0a4177315a 100644 --- a/src/lib/net/dnsclient.go +++ b/src/lib/net/dnsclient.go @@ -62,14 +62,15 @@ func _Exchange(cfg *_DNS_Config, c Conn, name string) (m *_DNS_Msg, err *os.Erro return nil, err } - // TODO(rsc): set up timeout or call ReadTimeout. - // right now net does not support that. + c.SetReadTimeout(1e9); // nanoseconds buf := make([]byte, 2000); // More than enough. n, err = c.Read(buf); + if err == os.EAGAIN { + continue; + } if err != nil { - // TODO(rsc): only continue if timed out - continue + return nil, err; } buf = buf[0:n]; in := new(_DNS_Msg); @@ -84,7 +85,7 @@ func _Exchange(cfg *_DNS_Config, c Conn, name string) (m *_DNS_Msg, err *os.Erro // Find answer for name in dns message. // On return, if err == nil, addrs != nil. -// TODO(rsc): Maybe return [][]byte (==[]IPAddr) instead? +// TODO(rsc): Maybe return []IP instead? func answer(name string, dns *_DNS_Msg) (addrs []string, err *os.Error) { addrs = make([]string, 0, len(dns.answer)); diff --git a/src/lib/net/dnsconfig.go b/src/lib/net/dnsconfig.go index afdbd91179d..385d07b6ae7 100644 --- a/src/lib/net/dnsconfig.go +++ b/src/lib/net/dnsconfig.go @@ -29,7 +29,7 @@ var _DNS_configError *os.Error; // of the host name to get the default search domain. // We assume it's in resolv.conf anyway. func _DNS_ReadConfig() (*_DNS_Config, *os.Error) { - // TODO(rsc): 6g won't let me use "file :=" + // TODO(rsc): 6g won't let me say file, err := var file *file; var err *os.Error; file, err = open("/etc/resolv.conf"); diff --git a/src/lib/net/fd.go b/src/lib/net/fd.go index 501a3f3a9a1..7509231925b 100644 --- a/src/lib/net/fd.go +++ b/src/lib/net/fd.go @@ -10,6 +10,7 @@ import ( "net"; "once"; "os"; + "sync"; "syscall"; ) @@ -24,6 +25,14 @@ type netFD struct { laddr string; raddr string; + // owned by client + rdeadline_delta int64; + rdeadline int64; + rio sync.Mutex; + wdeadline_delta int64; + wdeadline int64; + wio sync.Mutex; + // owned by fd wait server ncr, ncw int; } @@ -41,6 +50,14 @@ func setNonblock(fd int64) *os.Error { return nil } +// Make reads/writes blocking; last gasp, so no error checking. +func setBlock(fd int64) { + flags, e := syscall.Fcntl(fd, syscall.F_GETFL, 0); + if e != 0 { + return; + } + syscall.Fcntl(fd, syscall.F_SETFL, flags & ^syscall.O_NONBLOCK); +} // A pollServer helps FDs determine when to retry a non-blocking // read or write after they get EAGAIN. When an FD needs to wait, @@ -76,6 +93,7 @@ type pollServer struct { pr, pw *os.FD; pending map[int64] *netFD; poll *pollster; // low-level OS hooks + deadline int64; // next deadline (nsec since 1970) } func (s *pollServer) Run(); @@ -109,18 +127,24 @@ func newPollServer() (s *pollServer, err *os.Error) { func (s *pollServer) AddFD(fd *netFD, mode int) { if err := s.poll.AddFD(fd.fd, mode, false); err != nil { - print("pollServer AddFD: ", err.String(), "\n"); + panicln("pollServer AddFD ", fd.fd, ": ", err.String(), "\n"); return } + var t int64; key := fd.fd << 1; if mode == 'r' { fd.ncr++; + t = fd.rdeadline; } else { fd.ncw++; key++; + t = fd.wdeadline; + } + s.pending[key] = fd; + if t > 0 && (s.deadline == 0 || t < s.deadline) { + s.deadline = t; } - s.pending[key] = fd } func (s *pollServer) LookupFD(fd int64, mode int) *netFD { @@ -136,14 +160,88 @@ func (s *pollServer) LookupFD(fd int64, mode int) *netFD { return netfd } +func (s *pollServer) WakeFD(fd *netFD, mode int) { + if mode == 'r' { + for fd.ncr > 0 { + fd.ncr--; + fd.cr <- fd + } + } else { + for fd.ncw > 0 { + fd.ncw--; + fd.cw <- fd + } + } +} + +func (s *pollServer) Now() int64 { + sec, nsec, err := os.Time(); + if err != nil { + panic("net: os.Time: ", err.String()); + } + nsec += sec * 1e9; + return nsec; +} + +func (s *pollServer) CheckDeadlines() { + now := s.Now(); + // TODO(rsc): This will need to be handled more efficiently, + // probably with a heap indexed by wakeup time. + + var next_deadline int64; + for key, fd := range s.pending { + var t int64; + var mode int; + if key&1 == 0 { + mode = 'r'; + } else { + mode = 'w'; + } + if mode == 'r' { + t = fd.rdeadline; + } else { + t = fd.wdeadline; + } + if t > 0 { + if t <= now { + s.pending[key] = nil, false; + if mode == 'r' { + s.poll.DelFD(fd.fd, mode); + fd.rdeadline = -1; + } else { + s.poll.DelFD(fd.fd, mode); + fd.wdeadline = -1; + } + s.WakeFD(fd, mode); + } else if next_deadline == 0 || t < next_deadline { + next_deadline = t; + } + } + } + s.deadline = next_deadline; +} + func (s *pollServer) Run() { var scratch [100]byte; for { - fd, mode, err := s.poll.WaitFD(); + var t = s.deadline; + if t > 0 { + t = t - s.Now(); + if t < 0 { + s.CheckDeadlines(); + continue; + } + } + fd, mode, err := s.poll.WaitFD(t); if err != nil { print("pollServer WaitFD: ", err.String(), "\n"); return } + if fd < 0 { + // Timeout happened. + s.CheckDeadlines(); + continue; + } if fd == s.pr.Fd() { // Drain our wakeup pipe. for nn, e := s.pr.Read(scratch); nn > 0; { @@ -163,17 +261,7 @@ func (s *pollServer) Run() { print("pollServer: unexpected wakeup for fd=", netfd, " mode=", string(mode), "\n"); continue } - if mode == 'r' { - for netfd.ncr > 0 { - netfd.ncr--; - netfd.cr <- netfd - } - } else { - for netfd.ncw > 0 { - netfd.ncw--; - netfd.cw <- netfd - } - } + s.WakeFD(netfd, mode); } } } @@ -231,6 +319,15 @@ func (fd *netFD) Close() *os.Error { if fd == nil || fd.osfd == nil { return os.EINVAL } + + // In case the user has set linger, + // switch to blocking mode so the close blocks. + // As long as this doesn't happen often, + // we can handle the extra OS processes. + // Otherwise we'll need to use the pollserver + // for Close too. Sigh. + setBlock(fd.osfd.Fd()); + e := fd.osfd.Close(); fd.osfd = nil; fd.fd = -1; @@ -241,8 +338,15 @@ func (fd *netFD) Read(p []byte) (n int, err *os.Error) { if fd == nil || fd.osfd == nil { return -1, os.EINVAL } + fd.rio.Lock(); + defer fd.rio.Unlock(); + if fd.rdeadline_delta > 0 { + fd.rdeadline = pollserver.Now() + fd.rdeadline_delta; + } else { + fd.rdeadline = 0; + } n, err = fd.osfd.Read(p); - for err == os.EAGAIN { + for err == os.EAGAIN && fd.rdeadline >= 0 { pollserver.WaitRead(fd); n, err = fd.osfd.Read(p) } @@ -253,21 +357,29 @@ func (fd *netFD) Write(p []byte) (n int, err *os.Error) { if fd == nil || fd.osfd == nil { return -1, os.EINVAL } - // TODO(rsc): Lock fd while writing to avoid interlacing writes. + fd.wio.Lock(); + defer fd.wio.Unlock(); + if fd.wdeadline_delta > 0 { + fd.wdeadline = pollserver.Now() + fd.wdeadline_delta; + } else { + fd.wdeadline = 0; + } err = nil; nn := 0; - for nn < len(p) && err == nil { - // TODO(rsc): If os.FD.Write loops, have to use syscall instead. + for nn < len(p) { n, err = fd.osfd.Write(p[nn:len(p)]); - for err == os.EAGAIN { - pollserver.WaitWrite(fd); - n, err = fd.osfd.Write(p[nn:len(p)]) - } if n > 0 { nn += n } - if n == 0 { - break + if nn == len(p) { + break; + } + if err == os.EAGAIN && fd.wdeadline >= 0 { + pollserver.WaitWrite(fd); + continue; + } + if n == 0 || err != nil { + break; } } return nn, err diff --git a/src/lib/net/fd_darwin.go b/src/lib/net/fd_darwin.go index e5b74e7fc60..74f0f486779 100644 --- a/src/lib/net/fd_darwin.go +++ b/src/lib/net/fd_darwin.go @@ -62,15 +62,45 @@ func (p *pollster) AddFD(fd int64, mode int, repeat bool) *os.Error { return nil } -func (p *pollster) WaitFD() (fd int64, mode int, err *os.Error) { +func (p *pollster) DelFD(fd int64, mode int) { + var kmode int16; + if mode == 'r' { + kmode = syscall.EVFILT_READ + } else { + kmode = syscall.EVFILT_WRITE + } + var events [1]syscall.Kevent_t; + ev := &events[0]; + ev.Ident = fd; + ev.Filter = kmode; + + // EV_DELETE - delete event from kqueue list + // EV_RECEIPT - generate fake EV_ERROR as result of add, + // rather than waiting for real event + ev.Flags = syscall.EV_DELETE | syscall.EV_RECEIPT; + syscall.Kevent(p.kq, events, events, nil); +} + +func (p *pollster) WaitFD(nsec int64) (fd int64, mode int, err *os.Error) { + var t *syscall.Timespec; for len(p.events) == 0 { - nn, e := syscall.Kevent(p.kq, nil, p.eventbuf, nil); + if nsec > 0 { + if t == nil { + t = new(syscall.Timespec); + } + t.Sec = nsec / 1e9; + t.Nsec = uint64(nsec % 1e9); + } + nn, e := syscall.Kevent(p.kq, nil, p.eventbuf, t); if e != 0 { - if e == syscall.EAGAIN || e == syscall.EINTR { + if e == syscall.EINTR { continue } return -1, 0, os.ErrnoToError(e) } + if nn == 0 { + return -1, 0, nil; + } p.events = p.eventbuf[0:nn] } ev := &p.events[0]; diff --git a/src/lib/net/fd_linux.go b/src/lib/net/fd_linux.go index 0823260da90..8e2b57f224e 100644 --- a/src/lib/net/fd_linux.go +++ b/src/lib/net/fd_linux.go @@ -44,19 +44,19 @@ func (p *pollster) AddFD(fd int64, mode int, repeat bool) *os.Error { ev.Fd = int32(fd); ev.Events, already = p.events[fd]; if !repeat { - ev.Events |= syscall.EPOLLONESHOT + ev.Events |= syscall.EPOLLONESHOT; } if mode == 'r' { - ev.Events |= readFlags + ev.Events |= readFlags; } else { - ev.Events |= writeFlags + ev.Events |= writeFlags; } var op int64; if already { - op = syscall.EPOLL_CTL_MOD + op = syscall.EPOLL_CTL_MOD; } else { - op = syscall.EPOLL_CTL_ADD + op = syscall.EPOLL_CTL_ADD; } if e := syscall.Epoll_ctl(p.epfd, op, fd, &ev); e != 0 { return os.ErrnoToError(e) @@ -69,13 +69,13 @@ func (p *pollster) StopWaiting(fd int64, bits uint) { events, already := p.events[fd]; if !already { print("Epoll unexpected fd=", fd, "\n"); - return + return; } // If syscall.EPOLLONESHOT is not set, the wait // is a repeating wait, so don't change it. if events & syscall.EPOLLONESHOT == 0 { - return + return; } // Disable the given bits. @@ -87,50 +87,65 @@ func (p *pollster) StopWaiting(fd int64, bits uint) { ev.Fd = int32(fd); ev.Events = events; if e := syscall.Epoll_ctl(p.epfd, syscall.EPOLL_CTL_MOD, fd, &ev); e != 0 { - print("Epoll modify fd=", fd, ": ", os.ErrnoToError(e).String(), "\n") + print("Epoll modify fd=", fd, ": ", os.ErrnoToError(e).String(), "\n"); } - p.events[fd] = events + p.events[fd] = events; } else { if e := syscall.Epoll_ctl(p.epfd, syscall.EPOLL_CTL_DEL, fd, nil); e != 0 { - print("Epoll delete fd=", fd, ": ", os.ErrnoToError(e).String(), "\n") + print("Epoll delete fd=", fd, ": ", os.ErrnoToError(e).String(), "\n"); } - p.events[fd] = 0, false + p.events[fd] = 0, false; } } -func (p *pollster) WaitFD() (fd int64, mode int, err *os.Error) { +func (p *pollster) DelFD(fd int64, mode int) { + if mode == 'r' { + p.StopWaiting(fd, readFlags); + } else { + p.StopWaiting(fd, writeFlags); + } +} + +func (p *pollster) WaitFD(nsec int64) (fd int64, mode int, err *os.Error) { // Get an event. var evarray [1]syscall.EpollEvent; ev := &evarray[0]; - n, e := syscall.Epoll_wait(p.epfd, evarray, -1); + var msec int64 = -1; + if nsec > 0 { + msec = (nsec + 1e6 - 1)/1e6; + } + n, e := syscall.Epoll_wait(p.epfd, evarray, msec); for e == syscall.EAGAIN || e == syscall.EINTR { - n, e = syscall.Epoll_wait(p.epfd, evarray, -1) + n, e = syscall.Epoll_wait(p.epfd, evarray, msec); } if e != 0 { - return -1, 0, os.ErrnoToError(e) + return -1, 0, os.ErrnoToError(e); + } + if n == 0 { + return -1, 0, nil; } fd = int64(ev.Fd); if ev.Events & writeFlags != 0 { p.StopWaiting(fd, writeFlags); - return fd, 'w', nil + return fd, 'w', nil; } if ev.Events & readFlags != 0 { p.StopWaiting(fd, readFlags); - return fd, 'r', nil + return fd, 'r', nil; } // Other events are error conditions - wake whoever is waiting. events, already := p.events[fd]; if events & writeFlags != 0 { p.StopWaiting(fd, writeFlags); - return fd, 'w', nil + return fd, 'w', nil; } p.StopWaiting(fd, readFlags); - return fd, 'r', nil + return fd, 'r', nil; } func (p *pollster) Close() *os.Error { r, e := syscall.Close(p.epfd); - return os.ErrnoToError(e) + return os.ErrnoToError(e); } diff --git a/src/lib/net/net.go b/src/lib/net/net.go index c01c1053371..7ea5d2d4c65 100644 --- a/src/lib/net/net.go +++ b/src/lib/net/net.go @@ -255,11 +255,13 @@ func (c *connBase) SetWriteBuffer(bytes int) *os.Error { } func (c *connBase) SetReadTimeout(nsec int64) *os.Error { - return setsockopt_tv(c.sysFD(), syscall.SOL_SOCKET, syscall.SO_RCVTIMEO, nsec); + c.fd.rdeadline_delta = nsec; + return nil; } func (c *connBase) SetWriteTimeout(nsec int64) *os.Error { - return setsockopt_tv(c.sysFD(), syscall.SOL_SOCKET, syscall.SO_SNDTIMEO, nsec); + c.fd.wdeadline_delta = nsec; + return nil; } func (c *connBase) SetTimeout(nsec int64) *os.Error { @@ -432,29 +434,82 @@ func DialUDP(net, laddr, raddr string) (c *ConnUDP, err *os.Error) { // TODO: raw IP connections - // TODO: raw ethernet connections // A Conn is a generic network connection. type Conn interface { + // Read blocks until data is ready from the connection + // and then reads into b. It returns the number + // of bytes read, or 0 if the connection has been closed. Read(b []byte) (n int, err *os.Error); + + // Write writes the data in b to the connection. Write(b []byte) (n int, err *os.Error); + + // Close closes the connection. Close() *os.Error; - // For UDP sockets. + // For packet-based protocols such as UDP, + // ReadFrom reads the next packet from the network, + // returning the number of bytes read and the remote + // address that sent them. ReadFrom(b []byte) (n int, addr string, err *os.Error); + + // For packet-based protocols such as UDP, + // WriteTo writes the byte buffer b to the network + // as a single payload, sending it to the target address. WriteTo(addr string, b []byte) (n int, err *os.Error); - // Methods that have meaning only on some networks. + // SetReadBuffer sets the size of the operating system's + // receive buffer associated with the connection. SetReadBuffer(bytes int) *os.Error; + + // SetReadBuffer sets the size of the operating system's + // transmit buffer associated with the connection. SetWriteBuffer(bytes int) *os.Error; + + // SetTimeout sets the read and write deadlines associated + // with the connection. SetTimeout(nsec int64) *os.Error; + + // SetReadTimeout sets the time (in nanoseconds) that + // Read will wait for data before returning os.EAGAIN. + // Setting nsec == 0 (the default) disables the deadline. SetReadTimeout(nsec int64) *os.Error; + + // SetWriteTimeout sets the time (in nanoseconds) that + // Write will wait to send its data before returning os.EAGAIN. + // Setting nsec == 0 (the default) disables the deadline. + // Even if write times out, it may return n > 0, indicating that + // some of the data was successfully written. SetWriteTimeout(nsec int64) *os.Error; + + // SetLinger sets the behavior of Close() on a connection + // which still has data waiting to be sent or to be acknowledged. + // + // If sec < 0 (the default), Close returns immediately and + // the operating system finishes sending the data in the background. + // + // If sec == 0, Close returns immediately and the operating system + // discards any unsent or unacknowledged data. + // + // If sec > 0, Close blocks for at most sec seconds waiting for + // data to be sent and acknowledged. SetLinger(sec int) *os.Error; + + // SetReuseAddr sets whether it is okay to reuse addresses + // from recent connections that were not properly closed. SetReuseAddr(reuseaddr bool) *os.Error; + + // SetDontRoute sets whether outgoing messages should + // bypass the system routing tables. SetDontRoute(dontroute bool) *os.Error; + + // SetKeepAlive sets whether the operating system should send + // keepalive messages on the connection. SetKeepAlive(keepalive bool) *os.Error; + + // BindToDevice binds a connection to a particular network device. BindToDevice(dev string) *os.Error; } diff --git a/src/lib/net/parse_test.go b/src/lib/net/parse_test.go index d40a224e414..ddfeac15376 100644 --- a/src/lib/net/parse_test.go +++ b/src/lib/net/parse_test.go @@ -20,7 +20,6 @@ func TestReadLine(t *testing.T) { } br := bufio.NewBufRead(fd); - // TODO(rsc): 6g rejects "file :=" var file *file; file, err = open(filename); if file == nil { diff --git a/src/lib/net/timeout_test.go b/src/lib/net/timeout_test.go new file mode 100644 index 00000000000..e1ce9178906 --- /dev/null +++ b/src/lib/net/timeout_test.go @@ -0,0 +1,42 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package net + +import ( + "net"; + "testing"; + "time"; + "os"; +) + +func testTimeout(t *testing.T, network, addr string) { + fd, err := net.Dial(network, "", addr); + defer fd.Close(); + if err != nil { + t.Errorf("dial %s %s failed: %v", network, addr, err); + } + t0 := time.Nanoseconds(); + fd.SetReadTimeout(1e8); // 100ms + var b [100]byte; + n, err1 := fd.Read(b); + t1 := time.Nanoseconds(); + if n != 0 || err1 != os.EAGAIN { + t.Errorf("fd.Read on %s %s did not return 0, EAGAIN: %v, %v", network, addr, n, err1); + } + if t1 - t0 < 0.5e8 || t1 - t0 > 1.5e8 { + t.Errorf("fd.Read on %s %s took %f seconds, expected 0.1", network, addr, float64(t1 - t0) / 1e9); + } +} + +func TestTmeoutUDP(t *testing.T) { + testTimeout(t, "udp", "127.0.0.1:53"); +} + +func TestTimeoutTCP(t *testing.T) { + // 74.125.19.99 is www.google.com. + // could use dns, but dns depends on + // timeouts and this is the timeout test. + testTimeout(t, "tcp", "74.125.19.99:80"); +} diff --git a/src/lib/syscall/socket_darwin.go b/src/lib/syscall/socket_darwin.go index a0567e5c3a3..dc76b9bead3 100644 --- a/src/lib/syscall/socket_darwin.go +++ b/src/lib/syscall/socket_darwin.go @@ -68,7 +68,7 @@ func Setsockopt_tv(fd, level, opt, nsec int64) int64 { func Setsockopt_linger(fd, level, opt int64, sec int) int64 { var l Linger; - if sec != 0 { + if sec >= 0 { l.Yes = 1; l.Sec = int32(sec); } else { diff --git a/src/lib/syscall/socket_linux.go b/src/lib/syscall/socket_linux.go index a061577847f..39b9aa60f55 100644 --- a/src/lib/syscall/socket_linux.go +++ b/src/lib/syscall/socket_linux.go @@ -80,7 +80,7 @@ func Setsockopt_tv(fd, level, opt, nsec int64) int64 { func Setsockopt_linger(fd, level, opt int64, sec int) int64 { var l Linger; - if sec != 0 { + if sec >= 0 { l.Yes = 1; l.Sec = int32(sec) } else {