From 23e15f72538381dab83d02b3bf543cf95230d3e8 Mon Sep 17 00:00:00 2001 From: Dmitriy Vyukov Date: Fri, 9 Aug 2013 21:43:00 +0400 Subject: [PATCH] net: add special netFD mutex The mutex, fdMutex, handles locking and lifetime of sysfd, and serializes Read and Write methods. This allows to strip 2 sync.Mutex.Lock calls, 2 sync.Mutex.Unlock calls, 1 defer and some amount of misc overhead from every network operation. On linux/amd64, Intel E5-2690: benchmark old ns/op new ns/op delta BenchmarkTCP4Persistent 9595 9454 -1.47% BenchmarkTCP4Persistent-2 8978 8772 -2.29% BenchmarkTCP4ConcurrentReadWrite 4900 4625 -5.61% BenchmarkTCP4ConcurrentReadWrite-2 2603 2500 -3.96% In general it strips 70-500 ns from every network operation depending on processor model. On my relatively new E5-2690 it accounts to ~5% of network op cost. Fixes #6074. R=golang-dev, bradfitz, alex.brainman, iant, mikioh.mikioh CC=golang-dev https://golang.org/cl/12418043 --- src/pkg/net/fd_mutex.go | 184 +++++++++++++++++++++++++++++ src/pkg/net/fd_mutex_test.go | 186 ++++++++++++++++++++++++++++++ src/pkg/net/fd_poll_runtime.go | 2 +- src/pkg/net/fd_unix.go | 135 +++++++++++----------- src/pkg/net/fd_windows.go | 134 ++++++++++----------- src/pkg/net/sendfile_freebsd.go | 6 +- src/pkg/net/sendfile_linux.go | 6 +- src/pkg/net/sendfile_windows.go | 7 +- src/pkg/net/sockopt_posix.go | 8 +- src/pkg/net/sockoptip_bsd.go | 4 +- src/pkg/net/sockoptip_linux.go | 4 +- src/pkg/net/sockoptip_posix.go | 8 +- src/pkg/net/sockoptip_windows.go | 4 +- src/pkg/net/tcpsockopt_darwin.go | 2 +- src/pkg/net/tcpsockopt_openbsd.go | 2 +- src/pkg/net/tcpsockopt_posix.go | 2 +- src/pkg/net/tcpsockopt_unix.go | 2 +- src/pkg/net/tcpsockopt_windows.go | 2 +- src/pkg/runtime/mgc0.c | 4 +- src/pkg/runtime/mprof.goc | 4 +- src/pkg/runtime/netpoll.goc | 8 ++ src/pkg/runtime/proc.c | 2 +- src/pkg/runtime/race.c | 2 +- src/pkg/runtime/runtime.h | 2 +- src/pkg/runtime/sema.goc | 12 +- 25 files changed, 553 insertions(+), 179 deletions(-) create mode 100644 src/pkg/net/fd_mutex.go create mode 100644 src/pkg/net/fd_mutex_test.go diff --git a/src/pkg/net/fd_mutex.go b/src/pkg/net/fd_mutex.go new file mode 100644 index 00000000000..1caf974dd14 --- /dev/null +++ b/src/pkg/net/fd_mutex.go @@ -0,0 +1,184 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package net + +import "sync/atomic" + +// fdMutex is a specialized synchronization primitive +// that manages lifetime of an fd and serializes access +// to Read and Write methods on netFD. +type fdMutex struct { + state uint64 + rsema uint32 + wsema uint32 +} + +// fdMutex.state is organized as follows: +// 1 bit - whether netFD is closed, if set all subsequent lock operations will fail. +// 1 bit - lock for read operations. +// 1 bit - lock for write operations. +// 20 bits - total number of references (read+write+misc). +// 20 bits - number of outstanding read waiters. +// 20 bits - number of outstanding write waiters. +const ( + mutexClosed = 1 << 0 + mutexRLock = 1 << 1 + mutexWLock = 1 << 2 + mutexRef = 1 << 3 + mutexRefMask = (1<<20 - 1) << 3 + mutexRWait = 1 << 23 + mutexRMask = (1<<20 - 1) << 23 + mutexWWait = 1 << 43 + mutexWMask = (1<<20 - 1) << 43 +) + +// Read operations must do RWLock(true)/RWUnlock(true). +// Write operations must do RWLock(false)/RWUnlock(false). +// Misc operations must do Incref/Decref. Misc operations include functions like +// setsockopt and setDeadline. They need to use Incref/Decref to ensure that +// they operate on the correct fd in presence of a concurrent Close call +// (otherwise fd can be closed under their feet). +// Close operation must do IncrefAndClose/Decref. + +// RWLock/Incref return whether fd is open. +// RWUnlock/Decref return whether fd is closed and there are no remaining references. + +func (mu *fdMutex) Incref() bool { + for { + old := atomic.LoadUint64(&mu.state) + if old&mutexClosed != 0 { + return false + } + new := old + mutexRef + if new&mutexRefMask == 0 { + panic("net: inconsistent fdMutex") + } + if atomic.CompareAndSwapUint64(&mu.state, old, new) { + return true + } + } +} + +func (mu *fdMutex) IncrefAndClose() bool { + for { + old := atomic.LoadUint64(&mu.state) + if old&mutexClosed != 0 { + return false + } + // Mark as closed and acquire a reference. + new := (old | mutexClosed) + mutexRef + if new&mutexRefMask == 0 { + panic("net: inconsistent fdMutex") + } + // Remove all read and write waiters. + new &^= mutexRMask | mutexWMask + if atomic.CompareAndSwapUint64(&mu.state, old, new) { + // Wake all read and write waiters, + // they will observe closed flag after wakeup. + for old&mutexRMask != 0 { + old -= mutexRWait + runtime_Semrelease(&mu.rsema) + } + for old&mutexWMask != 0 { + old -= mutexWWait + runtime_Semrelease(&mu.wsema) + } + return true + } + } +} + +func (mu *fdMutex) Decref() bool { + for { + old := atomic.LoadUint64(&mu.state) + if old&mutexRefMask == 0 { + panic("net: inconsistent fdMutex") + } + new := old - mutexRef + if atomic.CompareAndSwapUint64(&mu.state, old, new) { + return new&(mutexClosed|mutexRef) == mutexClosed + } + } +} + +func (mu *fdMutex) RWLock(read bool) bool { + var mutexBit, mutexWait, mutexMask uint64 + var mutexSema *uint32 + if read { + mutexBit = mutexRLock + mutexWait = mutexRWait + mutexMask = mutexRMask + mutexSema = &mu.rsema + } else { + mutexBit = mutexWLock + mutexWait = mutexWWait + mutexMask = mutexWMask + mutexSema = &mu.wsema + } + for { + old := atomic.LoadUint64(&mu.state) + if old&mutexClosed != 0 { + return false + } + var new uint64 + if old&mutexBit == 0 { + // Lock is free, acquire it. + new = (old | mutexBit) + mutexRef + if new&mutexRefMask == 0 { + panic("net: inconsistent fdMutex") + } + } else { + // Wait for lock. + new = old + mutexWait + if new&mutexMask == 0 { + panic("net: inconsistent fdMutex") + } + } + if atomic.CompareAndSwapUint64(&mu.state, old, new) { + if old&mutexBit == 0 { + return true + } + runtime_Semacquire(mutexSema) + // The signaller has subtracted mutexWait. + } + } +} + +func (mu *fdMutex) RWUnlock(read bool) bool { + var mutexBit, mutexWait, mutexMask uint64 + var mutexSema *uint32 + if read { + mutexBit = mutexRLock + mutexWait = mutexRWait + mutexMask = mutexRMask + mutexSema = &mu.rsema + } else { + mutexBit = mutexWLock + mutexWait = mutexWWait + mutexMask = mutexWMask + mutexSema = &mu.wsema + } + for { + old := atomic.LoadUint64(&mu.state) + if old&mutexBit == 0 || old&mutexRefMask == 0 { + panic("net: inconsistent fdMutex") + } + // Drop lock, drop reference and wake read waiter if present. + new := (old &^ mutexBit) - mutexRef + if old&mutexMask != 0 { + new -= mutexWait + } + if atomic.CompareAndSwapUint64(&mu.state, old, new) { + if old&mutexMask != 0 { + runtime_Semrelease(mutexSema) + } + return new&(mutexClosed|mutexRef) == mutexClosed + } + } +} + +// Implemented in runtime package. +func runtime_Semacquire(sema *uint32) +func runtime_Semrelease(sema *uint32) diff --git a/src/pkg/net/fd_mutex_test.go b/src/pkg/net/fd_mutex_test.go new file mode 100644 index 00000000000..8383084b7a2 --- /dev/null +++ b/src/pkg/net/fd_mutex_test.go @@ -0,0 +1,186 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package net + +import ( + "math/rand" + "runtime" + "testing" + "time" +) + +func TestMutexLock(t *testing.T) { + var mu fdMutex + + if !mu.Incref() { + t.Fatal("broken") + } + if mu.Decref() { + t.Fatal("broken") + } + + if !mu.RWLock(true) { + t.Fatal("broken") + } + if mu.RWUnlock(true) { + t.Fatal("broken") + } + + if !mu.RWLock(false) { + t.Fatal("broken") + } + if mu.RWUnlock(false) { + t.Fatal("broken") + } +} + +func TestMutexClose(t *testing.T) { + var mu fdMutex + if !mu.IncrefAndClose() { + t.Fatal("broken") + } + + if mu.Incref() { + t.Fatal("broken") + } + if mu.RWLock(true) { + t.Fatal("broken") + } + if mu.RWLock(false) { + t.Fatal("broken") + } + if mu.IncrefAndClose() { + t.Fatal("broken") + } +} + +func TestMutexCloseUnblock(t *testing.T) { + c := make(chan bool) + var mu fdMutex + mu.RWLock(true) + for i := 0; i < 4; i++ { + go func() { + if mu.RWLock(true) { + t.Fatal("broken") + } + c <- true + }() + } + // Concurrent goroutines must not be able to read lock the mutex. + time.Sleep(time.Millisecond) + select { + case <-c: + t.Fatal("broken") + default: + } + mu.IncrefAndClose() // Must unblock the readers. + for i := 0; i < 4; i++ { + select { + case <-c: + case <-time.After(10 * time.Second): + t.Fatal("broken") + } + } + if mu.Decref() { + t.Fatal("broken") + } + if !mu.RWUnlock(true) { + t.Fatal("broken") + } +} + +func TestMutexPanic(t *testing.T) { + ensurePanics := func(f func()) { + defer func() { + if recover() == nil { + t.Fatal("does not panic") + } + }() + f() + } + + var mu fdMutex + ensurePanics(func() { mu.Decref() }) + ensurePanics(func() { mu.RWUnlock(true) }) + ensurePanics(func() { mu.RWUnlock(false) }) + + ensurePanics(func() { mu.Incref(); mu.Decref(); mu.Decref() }) + ensurePanics(func() { mu.RWLock(true); mu.RWUnlock(true); mu.RWUnlock(true) }) + ensurePanics(func() { mu.RWLock(false); mu.RWUnlock(false); mu.RWUnlock(false) }) + + // ensure that it's still not broken + mu.Incref() + mu.Decref() + mu.RWLock(true) + mu.RWUnlock(true) + mu.RWLock(false) + mu.RWUnlock(false) +} + +func TestMutexStress(t *testing.T) { + P := 8 + N := int(1e6) + if testing.Short() { + P = 4 + N = 1e4 + } + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P)) + done := make(chan bool) + var mu fdMutex + var readState [2]uint64 + var writeState [2]uint64 + for p := 0; p < P; p++ { + go func() { + r := rand.New(rand.NewSource(rand.Int63())) + for i := 0; i < N; i++ { + switch r.Intn(3) { + case 0: + if !mu.Incref() { + t.Fatal("broken") + } + if mu.Decref() { + t.Fatal("broken") + } + case 1: + if !mu.RWLock(true) { + t.Fatal("broken") + } + // Ensure that it provides mutual exclusion for readers. + if readState[0] != readState[1] { + t.Fatal("broken") + } + readState[0]++ + readState[1]++ + if mu.RWUnlock(true) { + t.Fatal("broken") + } + case 2: + if !mu.RWLock(false) { + t.Fatal("broken") + } + // Ensure that it provides mutual exclusion for writers. + if writeState[0] != writeState[1] { + t.Fatal("broken") + } + writeState[0]++ + writeState[1]++ + if mu.RWUnlock(false) { + t.Fatal("broken") + } + } + } + done <- true + }() + } + for p := 0; p < P; p++ { + <-done + } + if !mu.IncrefAndClose() { + t.Fatal("broken") + } + if !mu.Decref() { + t.Fatal("broken") + } +} diff --git a/src/pkg/net/fd_poll_runtime.go b/src/pkg/net/fd_poll_runtime.go index 6ae5c609ac4..03474cf2c37 100644 --- a/src/pkg/net/fd_poll_runtime.go +++ b/src/pkg/net/fd_poll_runtime.go @@ -132,7 +132,7 @@ func setDeadlineImpl(fd *netFD, t time.Time, mode int) error { if t.IsZero() { d = 0 } - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode) diff --git a/src/pkg/net/fd_unix.go b/src/pkg/net/fd_unix.go index a2a771491e4..f704c0a2a0a 100644 --- a/src/pkg/net/fd_unix.go +++ b/src/pkg/net/fd_unix.go @@ -10,7 +10,6 @@ import ( "io" "os" "runtime" - "sync" "sync/atomic" "syscall" "time" @@ -18,13 +17,8 @@ import ( // Network file descriptor. type netFD struct { - // locking/lifetime of sysfd - sysmu sync.Mutex - sysref int - - // must lock both sysmu and pollDesc to write - // can lock either to read - closing bool + // locking/lifetime of sysfd + serialize access to Read and Write methods + fdmu fdMutex // immutable until Close sysfd int @@ -35,9 +29,6 @@ type netFD struct { laddr Addr raddr Addr - // serialize access to Read and Write methods - rio, wio sync.Mutex - // wait server pd pollDesc } @@ -84,8 +75,9 @@ func (fd *netFD) name() string { } func (fd *netFD) connect(la, ra syscall.Sockaddr) error { - fd.wio.Lock() - defer fd.wio.Unlock() + // Do not need to call fd.writeLock here, + // because fd is not yet accessible to user, + // so no concurrent operations are possible. if err := fd.pd.PrepareWrite(); err != nil { return err } @@ -104,44 +96,69 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error { return nil } +func (fd *netFD) destroy() { + // Poller may want to unregister fd in readiness notification mechanism, + // so this must be executed before closesocket. + fd.pd.Close() + closesocket(fd.sysfd) + fd.sysfd = -1 + runtime.SetFinalizer(fd, nil) +} + // Add a reference to this fd. -// If closing==true, pollDesc must be locked; mark the fd as closing. // Returns an error if the fd cannot be used. -func (fd *netFD) incref(closing bool) error { - fd.sysmu.Lock() - if fd.closing { - fd.sysmu.Unlock() +func (fd *netFD) incref() error { + if !fd.fdmu.Incref() { return errClosing } - fd.sysref++ - if closing { - fd.closing = true - } - fd.sysmu.Unlock() return nil } -// Remove a reference to this FD and close if we've been asked to do so (and -// there are no references left. +// Remove a reference to this FD and close if we've been asked to do so +// (and there are no references left). func (fd *netFD) decref() { - fd.sysmu.Lock() - fd.sysref-- - if fd.closing && fd.sysref == 0 { - // Poller may want to unregister fd in readiness notification mechanism, - // so this must be executed before closesocket. - fd.pd.Close() - closesocket(fd.sysfd) - fd.sysfd = -1 - runtime.SetFinalizer(fd, nil) + if fd.fdmu.Decref() { + fd.destroy() + } +} + +// Add a reference to this fd and lock for reading. +// Returns an error if the fd cannot be used. +func (fd *netFD) readLock() error { + if !fd.fdmu.RWLock(true) { + return errClosing + } + return nil +} + +// Unlock for reading and remove a reference to this FD. +func (fd *netFD) readUnlock() { + if fd.fdmu.RWUnlock(true) { + fd.destroy() + } +} + +// Add a reference to this fd and lock for writing. +// Returns an error if the fd cannot be used. +func (fd *netFD) writeLock() error { + if !fd.fdmu.RWLock(false) { + return errClosing + } + return nil +} + +// Unlock for writing and remove a reference to this FD. +func (fd *netFD) writeUnlock() { + if fd.fdmu.RWUnlock(false) { + fd.destroy() } - fd.sysmu.Unlock() } func (fd *netFD) Close() error { fd.pd.Lock() // needed for both fd.incref(true) and pollDesc.Evict - if err := fd.incref(true); err != nil { + if !fd.fdmu.IncrefAndClose() { fd.pd.Unlock() - return err + return errClosing } // Unblock any I/O. Once it all unblocks and returns, // so that it cannot be referring to fd.sysfd anymore, @@ -158,7 +175,7 @@ func (fd *netFD) Close() error { } func (fd *netFD) shutdown(how int) error { - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() @@ -178,12 +195,10 @@ func (fd *netFD) CloseWrite() error { } func (fd *netFD) Read(p []byte) (n int, err error) { - fd.rio.Lock() - defer fd.rio.Unlock() - if err := fd.incref(false); err != nil { + if err := fd.readLock(); err != nil { return 0, err } - defer fd.decref() + defer fd.readUnlock() if err := fd.pd.PrepareRead(); err != nil { return 0, &OpError{"read", fd.net, fd.raddr, err} } @@ -207,12 +222,10 @@ func (fd *netFD) Read(p []byte) (n int, err error) { } func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) { - fd.rio.Lock() - defer fd.rio.Unlock() - if err := fd.incref(false); err != nil { + if err := fd.readLock(); err != nil { return 0, nil, err } - defer fd.decref() + defer fd.readUnlock() if err := fd.pd.PrepareRead(); err != nil { return 0, nil, &OpError{"read", fd.net, fd.laddr, err} } @@ -236,12 +249,10 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) { } func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err error) { - fd.rio.Lock() - defer fd.rio.Unlock() - if err := fd.incref(false); err != nil { + if err := fd.readLock(); err != nil { return 0, 0, 0, nil, err } - defer fd.decref() + defer fd.readUnlock() if err := fd.pd.PrepareRead(); err != nil { return 0, 0, 0, nil, &OpError{"read", fd.net, fd.laddr, err} } @@ -272,12 +283,10 @@ func chkReadErr(n int, err error, fd *netFD) error { } func (fd *netFD) Write(p []byte) (nn int, err error) { - fd.wio.Lock() - defer fd.wio.Unlock() - if err := fd.incref(false); err != nil { + if err := fd.writeLock(); err != nil { return 0, err } - defer fd.decref() + defer fd.writeUnlock() if err := fd.pd.PrepareWrite(); err != nil { return 0, &OpError{"write", fd.net, fd.raddr, err} } @@ -311,12 +320,10 @@ func (fd *netFD) Write(p []byte) (nn int, err error) { } func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) { - fd.wio.Lock() - defer fd.wio.Unlock() - if err := fd.incref(false); err != nil { + if err := fd.writeLock(); err != nil { return 0, err } - defer fd.decref() + defer fd.writeUnlock() if err := fd.pd.PrepareWrite(); err != nil { return 0, &OpError{"write", fd.net, fd.raddr, err} } @@ -338,12 +345,10 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) { } func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err error) { - fd.wio.Lock() - defer fd.wio.Unlock() - if err := fd.incref(false); err != nil { + if err := fd.writeLock(); err != nil { return 0, 0, err } - defer fd.decref() + defer fd.writeUnlock() if err := fd.pd.PrepareWrite(); err != nil { return 0, 0, &OpError{"write", fd.net, fd.raddr, err} } @@ -366,12 +371,10 @@ func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oob } func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err error) { - fd.rio.Lock() - defer fd.rio.Unlock() - if err := fd.incref(false); err != nil { + if err := fd.readLock(); err != nil { return nil, err } - defer fd.decref() + defer fd.readUnlock() var s int var rsa syscall.Sockaddr diff --git a/src/pkg/net/fd_windows.go b/src/pkg/net/fd_windows.go index ff3966e4334..ff0190240b9 100644 --- a/src/pkg/net/fd_windows.go +++ b/src/pkg/net/fd_windows.go @@ -105,7 +105,6 @@ type operation struct { qty uint32 // fields used only by net package - mu sync.Mutex fd *netFD errc chan error buf syscall.WSABuf @@ -246,10 +245,8 @@ func startServer() { // Network file descriptor. type netFD struct { - // locking/lifetime of sysfd - sysmu sync.Mutex - sysref int - closing bool + // locking/lifetime of sysfd + serialize access to Read and Write methods + fdmu fdMutex // immutable until Close sysfd syscall.Handle @@ -313,6 +310,9 @@ func (fd *netFD) setAddr(laddr, raddr Addr) { } func (fd *netFD) connect(la, ra syscall.Sockaddr) error { + // Do not need to call fd.writeLock here, + // because fd is not yet accessible to user, + // so no concurrent operations are possible. if !canUseConnectEx(fd.net) { return syscall.Connect(fd.sysfd, ra) } @@ -332,8 +332,6 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error { } // Call ConnectEx API. o := &fd.wop - o.mu.Lock() - defer o.mu.Unlock() o.sa = ra _, err := iosrv.ExecIO(o, "ConnectEx", func(o *operation) error { return syscall.ConnectEx(o.fd.sysfd, o.sa, nil, 0, nil, &o.o) @@ -345,64 +343,80 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error { return syscall.Setsockopt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_UPDATE_CONNECT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd))) } +func (fd *netFD) destroy() { + if fd.sysfd == syscall.InvalidHandle { + return + } + // Poller may want to unregister fd in readiness notification mechanism, + // so this must be executed before closesocket. + fd.pd.Close() + closesocket(fd.sysfd) + fd.sysfd = syscall.InvalidHandle + // no need for a finalizer anymore + runtime.SetFinalizer(fd, nil) +} + // Add a reference to this fd. -// If closing==true, mark the fd as closing. // Returns an error if the fd cannot be used. -func (fd *netFD) incref(closing bool) error { - if fd == nil { +func (fd *netFD) incref() error { + if !fd.fdmu.Incref() { return errClosing } - fd.sysmu.Lock() - if fd.closing { - fd.sysmu.Unlock() - return errClosing - } - fd.sysref++ - if closing { - fd.closing = true - } - closing = fd.closing - fd.sysmu.Unlock() return nil } -// Remove a reference to this FD and close if we've been asked to do so (and -// there are no references left. +// Remove a reference to this FD and close if we've been asked to do so +// (and there are no references left). func (fd *netFD) decref() { - if fd == nil { - return + if fd.fdmu.Decref() { + fd.destroy() } - fd.sysmu.Lock() - fd.sysref-- - if fd.closing && fd.sysref == 0 && fd.sysfd != syscall.InvalidHandle { - // Poller may want to unregister fd in readiness notification mechanism, - // so this must be executed before closesocket. - fd.pd.Close() - closesocket(fd.sysfd) - fd.sysfd = syscall.InvalidHandle - // no need for a finalizer anymore - runtime.SetFinalizer(fd, nil) +} + +// Add a reference to this fd and lock for reading. +// Returns an error if the fd cannot be used. +func (fd *netFD) readLock() error { + if !fd.fdmu.RWLock(true) { + return errClosing + } + return nil +} + +// Unlock for reading and remove a reference to this FD. +func (fd *netFD) readUnlock() { + if fd.fdmu.RWUnlock(true) { + fd.destroy() + } +} + +// Add a reference to this fd and lock for writing. +// Returns an error if the fd cannot be used. +func (fd *netFD) writeLock() error { + if !fd.fdmu.RWLock(false) { + return errClosing + } + return nil +} + +// Unlock for writing and remove a reference to this FD. +func (fd *netFD) writeUnlock() { + if fd.fdmu.RWUnlock(false) { + fd.destroy() } - fd.sysmu.Unlock() } func (fd *netFD) Close() error { - if err := fd.incref(true); err != nil { - return err + if !fd.fdmu.IncrefAndClose() { + return errClosing } - defer fd.decref() // unblock pending reader and writer fd.pd.Evict() - // wait for both reader and writer to exit - fd.rop.mu.Lock() - fd.wop.mu.Lock() - fd.rop.mu.Unlock() - fd.wop.mu.Unlock() + fd.decref() return nil } func (fd *netFD) shutdown(how int) error { - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() @@ -422,13 +436,11 @@ func (fd *netFD) CloseWrite() error { } func (fd *netFD) Read(buf []byte) (int, error) { - if err := fd.incref(false); err != nil { + if err := fd.readLock(); err != nil { return 0, err } - defer fd.decref() + defer fd.readUnlock() o := &fd.rop - o.mu.Lock() - defer o.mu.Unlock() o.InitBuf(buf) n, err := iosrv.ExecIO(o, "WSARecv", func(o *operation) error { return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil) @@ -443,13 +455,11 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) { if len(buf) == 0 { return 0, nil, nil } - if err := fd.incref(false); err != nil { + if err := fd.readLock(); err != nil { return 0, nil, err } - defer fd.decref() + defer fd.readUnlock() o := &fd.rop - o.mu.Lock() - defer o.mu.Unlock() o.InitBuf(buf) n, err = iosrv.ExecIO(o, "WSARecvFrom", func(o *operation) error { if o.rsa == nil { @@ -466,13 +476,11 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) { } func (fd *netFD) Write(buf []byte) (int, error) { - if err := fd.incref(false); err != nil { + if err := fd.writeLock(); err != nil { return 0, err } - defer fd.decref() + defer fd.writeUnlock() o := &fd.wop - o.mu.Lock() - defer o.mu.Unlock() o.InitBuf(buf) return iosrv.ExecIO(o, "WSASend", func(o *operation) error { return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil) @@ -483,13 +491,11 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { if len(buf) == 0 { return 0, nil } - if err := fd.incref(false); err != nil { + if err := fd.writeLock(); err != nil { return 0, err } - defer fd.decref() + defer fd.writeUnlock() o := &fd.wop - o.mu.Lock() - defer o.mu.Unlock() o.InitBuf(buf) o.sa = sa return iosrv.ExecIO(o, "WSASendto", func(o *operation) error { @@ -498,10 +504,10 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { } func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) { - if err := fd.incref(false); err != nil { + if err := fd.readLock(); err != nil { return nil, err } - defer fd.decref() + defer fd.readUnlock() // Get new socket. s, err := sysSocket(fd.family, fd.sotype, 0) @@ -522,8 +528,6 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) { // Submit accept request. o := &fd.rop - o.mu.Lock() - defer o.mu.Unlock() o.handle = s var rawsa [2]syscall.RawSockaddrAny o.rsan = int32(unsafe.Sizeof(rawsa[0])) diff --git a/src/pkg/net/sendfile_freebsd.go b/src/pkg/net/sendfile_freebsd.go index dc5b767557b..42fe799efbd 100644 --- a/src/pkg/net/sendfile_freebsd.go +++ b/src/pkg/net/sendfile_freebsd.go @@ -58,12 +58,10 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { return 0, err, false } - c.wio.Lock() - defer c.wio.Unlock() - if err := c.incref(false); err != nil { + if err := c.writeLock(); err != nil { return 0, err, true } - defer c.decref() + defer c.writeUnlock() dst := c.sysfd src := int(f.Fd()) diff --git a/src/pkg/net/sendfile_linux.go b/src/pkg/net/sendfile_linux.go index 6f1323b3dcd..5e117636a80 100644 --- a/src/pkg/net/sendfile_linux.go +++ b/src/pkg/net/sendfile_linux.go @@ -36,12 +36,10 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { return 0, nil, false } - c.wio.Lock() - defer c.wio.Unlock() - if err := c.incref(false); err != nil { + if err := c.writeLock(); err != nil { return 0, err, true } - defer c.decref() + defer c.writeUnlock() dst := c.sysfd src := int(f.Fd()) diff --git a/src/pkg/net/sendfile_windows.go b/src/pkg/net/sendfile_windows.go index e9b9f91da54..0107f679b38 100644 --- a/src/pkg/net/sendfile_windows.go +++ b/src/pkg/net/sendfile_windows.go @@ -34,13 +34,12 @@ func sendFile(fd *netFD, r io.Reader) (written int64, err error, handled bool) { return 0, nil, false } - if err := fd.incref(false); err != nil { + if err := fd.writeLock(); err != nil { return 0, err, true } - defer fd.decref() + defer fd.writeUnlock() + o := &fd.wop - o.mu.Lock() - defer o.mu.Unlock() o.qty = uint32(n) o.handle = syscall.Handle(f.Fd()) done, err := iosrv.ExecIO(o, "TransmitFile", func(o *operation) error { diff --git a/src/pkg/net/sockopt_posix.go b/src/pkg/net/sockopt_posix.go index 886afc2c75e..da2742c9a4a 100644 --- a/src/pkg/net/sockopt_posix.go +++ b/src/pkg/net/sockopt_posix.go @@ -101,7 +101,7 @@ done: } func setReadBuffer(fd *netFD, bytes int) error { - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() @@ -109,7 +109,7 @@ func setReadBuffer(fd *netFD, bytes int) error { } func setWriteBuffer(fd *netFD, bytes int) error { - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() @@ -117,7 +117,7 @@ func setWriteBuffer(fd *netFD, bytes int) error { } func setKeepAlive(fd *netFD, keepalive bool) error { - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() @@ -133,7 +133,7 @@ func setLinger(fd *netFD, sec int) error { l.Onoff = 0 l.Linger = 0 } - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() diff --git a/src/pkg/net/sockoptip_bsd.go b/src/pkg/net/sockoptip_bsd.go index bcae43c31d4..ca080fd7e4d 100644 --- a/src/pkg/net/sockoptip_bsd.go +++ b/src/pkg/net/sockoptip_bsd.go @@ -18,7 +18,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error { } var a [4]byte copy(a[:], ip.To4()) - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() @@ -26,7 +26,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error { } func setIPv4MulticastLoopback(fd *netFD, v bool) error { - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() diff --git a/src/pkg/net/sockoptip_linux.go b/src/pkg/net/sockoptip_linux.go index f9cf938d70c..a69b778e4d1 100644 --- a/src/pkg/net/sockoptip_linux.go +++ b/src/pkg/net/sockoptip_linux.go @@ -15,7 +15,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error { v = int32(ifi.Index) } mreq := &syscall.IPMreqn{Ifindex: v} - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() @@ -23,7 +23,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error { } func setIPv4MulticastLoopback(fd *netFD, v bool) error { - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() diff --git a/src/pkg/net/sockoptip_posix.go b/src/pkg/net/sockoptip_posix.go index c82eef0f5f5..5c2a5872f49 100644 --- a/src/pkg/net/sockoptip_posix.go +++ b/src/pkg/net/sockoptip_posix.go @@ -16,7 +16,7 @@ func joinIPv4Group(fd *netFD, ifi *Interface, ip IP) error { if err := setIPv4MreqToInterface(mreq, ifi); err != nil { return err } - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() @@ -28,7 +28,7 @@ func setIPv6MulticastInterface(fd *netFD, ifi *Interface) error { if ifi != nil { v = ifi.Index } - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() @@ -36,7 +36,7 @@ func setIPv6MulticastInterface(fd *netFD, ifi *Interface) error { } func setIPv6MulticastLoopback(fd *netFD, v bool) error { - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() @@ -49,7 +49,7 @@ func joinIPv6Group(fd *netFD, ifi *Interface, ip IP) error { if ifi != nil { mreq.Interface = uint32(ifi.Index) } - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() diff --git a/src/pkg/net/sockoptip_windows.go b/src/pkg/net/sockoptip_windows.go index fbaf0ed6f45..7b11f207aaf 100644 --- a/src/pkg/net/sockoptip_windows.go +++ b/src/pkg/net/sockoptip_windows.go @@ -17,7 +17,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error { } var a [4]byte copy(a[:], ip.To4()) - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() @@ -25,7 +25,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error { } func setIPv4MulticastLoopback(fd *netFD, v bool) error { - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() diff --git a/src/pkg/net/tcpsockopt_darwin.go b/src/pkg/net/tcpsockopt_darwin.go index d052a140d74..33140849c95 100644 --- a/src/pkg/net/tcpsockopt_darwin.go +++ b/src/pkg/net/tcpsockopt_darwin.go @@ -14,7 +14,7 @@ import ( // Set keep alive period. func setKeepAlivePeriod(fd *netFD, d time.Duration) error { - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() diff --git a/src/pkg/net/tcpsockopt_openbsd.go b/src/pkg/net/tcpsockopt_openbsd.go index 306f4e050d2..3480f932c80 100644 --- a/src/pkg/net/tcpsockopt_openbsd.go +++ b/src/pkg/net/tcpsockopt_openbsd.go @@ -14,7 +14,7 @@ import ( // Set keep alive period. func setKeepAlivePeriod(fd *netFD, d time.Duration) error { - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() diff --git a/src/pkg/net/tcpsockopt_posix.go b/src/pkg/net/tcpsockopt_posix.go index afd80644a18..8b41b2117dc 100644 --- a/src/pkg/net/tcpsockopt_posix.go +++ b/src/pkg/net/tcpsockopt_posix.go @@ -12,7 +12,7 @@ import ( ) func setNoDelay(fd *netFD, noDelay bool) error { - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() diff --git a/src/pkg/net/tcpsockopt_unix.go b/src/pkg/net/tcpsockopt_unix.go index dfc0452d294..fba2acdb601 100644 --- a/src/pkg/net/tcpsockopt_unix.go +++ b/src/pkg/net/tcpsockopt_unix.go @@ -14,7 +14,7 @@ import ( // Set keep alive period. func setKeepAlivePeriod(fd *netFD, d time.Duration) error { - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() diff --git a/src/pkg/net/tcpsockopt_windows.go b/src/pkg/net/tcpsockopt_windows.go index 538366d909a..0bf4312f248 100644 --- a/src/pkg/net/tcpsockopt_windows.go +++ b/src/pkg/net/tcpsockopt_windows.go @@ -11,7 +11,7 @@ import ( ) func setKeepAlivePeriod(fd *netFD, d time.Duration) error { - if err := fd.incref(false); err != nil { + if err := fd.incref(); err != nil { return err } defer fd.decref() diff --git a/src/pkg/runtime/mgc0.c b/src/pkg/runtime/mgc0.c index abf5df10c11..3c7df994751 100644 --- a/src/pkg/runtime/mgc0.c +++ b/src/pkg/runtime/mgc0.c @@ -2019,7 +2019,7 @@ runtime·gc(int32 force) if(gcpercent < 0) return; - runtime·semacquire(&runtime·worldsema); + runtime·semacquire(&runtime·worldsema, false); if(!force && mstats.heap_alloc < mstats.next_gc) { // typically threads which lost the race to grab // worldsema exit here when gc is done. @@ -2218,7 +2218,7 @@ runtime·ReadMemStats(MStats *stats) // because stoptheworld can only be used by // one goroutine at a time, and there might be // a pending garbage collection already calling it. - runtime·semacquire(&runtime·worldsema); + runtime·semacquire(&runtime·worldsema, false); m->gcing = 1; runtime·stoptheworld(); updatememstats(nil); diff --git a/src/pkg/runtime/mprof.goc b/src/pkg/runtime/mprof.goc index 0d89a267b9e..6e51ef3eb16 100644 --- a/src/pkg/runtime/mprof.goc +++ b/src/pkg/runtime/mprof.goc @@ -447,7 +447,7 @@ func Stack(b Slice, all bool) (n int) { pc = (uintptr)runtime·getcallerpc(&b); if(all) { - runtime·semacquire(&runtime·worldsema); + runtime·semacquire(&runtime·worldsema, false); m->gcing = 1; runtime·stoptheworld(); } @@ -494,7 +494,7 @@ func GoroutineProfile(b Slice) (n int, ok bool) { ok = false; n = runtime·gcount(); if(n <= b.len) { - runtime·semacquire(&runtime·worldsema); + runtime·semacquire(&runtime·worldsema, false); m->gcing = 1; runtime·stoptheworld(); diff --git a/src/pkg/runtime/netpoll.goc b/src/pkg/runtime/netpoll.goc index ebe6defa00b..ec6a4113fbc 100644 --- a/src/pkg/runtime/netpoll.goc +++ b/src/pkg/runtime/netpoll.goc @@ -206,6 +206,14 @@ func runtime_pollUnblock(pd *PollDesc) { runtime·ready(wg); } +func runtime_Semacquire(addr *uint32) { + runtime·semacquire(addr, true); +} + +func runtime_Semrelease(addr *uint32) { + runtime·semrelease(addr); +} + uintptr runtime·netpollfd(PollDesc *pd) { diff --git a/src/pkg/runtime/proc.c b/src/pkg/runtime/proc.c index 1c39807e00d..95b39b6d5e9 100644 --- a/src/pkg/runtime/proc.c +++ b/src/pkg/runtime/proc.c @@ -1836,7 +1836,7 @@ runtime·gomaxprocsfunc(int32 n) } runtime·unlock(&runtime·sched); - runtime·semacquire(&runtime·worldsema); + runtime·semacquire(&runtime·worldsema, false); m->gcing = 1; runtime·stoptheworld(); newprocs = n; diff --git a/src/pkg/runtime/race.c b/src/pkg/runtime/race.c index 557da6f8e35..875375da285 100644 --- a/src/pkg/runtime/race.c +++ b/src/pkg/runtime/race.c @@ -326,7 +326,7 @@ runtime·RaceReleaseMerge(void *addr) void runtime·RaceSemacquire(uint32 *s) { - runtime·semacquire(s); + runtime·semacquire(s, false); } // func RaceSemrelease(s *uint32) diff --git a/src/pkg/runtime/runtime.h b/src/pkg/runtime/runtime.h index 7d04a75424a..a3edb5e9551 100644 --- a/src/pkg/runtime/runtime.h +++ b/src/pkg/runtime/runtime.h @@ -1021,7 +1021,7 @@ bool runtime·isInf(float64 f, int32 sign); bool runtime·isNaN(float64 f); float64 runtime·ldexp(float64 d, int32 e); float64 runtime·modf(float64 d, float64 *ip); -void runtime·semacquire(uint32*); +void runtime·semacquire(uint32*, bool); void runtime·semrelease(uint32*); int32 runtime·gomaxprocsfunc(int32 n); void runtime·procyield(uint32); diff --git a/src/pkg/runtime/sema.goc b/src/pkg/runtime/sema.goc index 4df01fc4e49..05222e2df72 100644 --- a/src/pkg/runtime/sema.goc +++ b/src/pkg/runtime/sema.goc @@ -98,8 +98,8 @@ cansemacquire(uint32 *addr) return 0; } -static void -semacquireimpl(uint32 volatile *addr, int32 profile) +void +runtime·semacquire(uint32 volatile *addr, bool profile) { Sema s; // Needs to be allocated on stack, otherwise garbage collector could deallocate it SemaRoot *root; @@ -144,12 +144,6 @@ semacquireimpl(uint32 volatile *addr, int32 profile) } } -void -runtime·semacquire(uint32 volatile *addr) -{ - semacquireimpl(addr, 0); -} - void runtime·semrelease(uint32 volatile *addr) { @@ -189,7 +183,7 @@ runtime·semrelease(uint32 volatile *addr) } func runtime_Semacquire(addr *uint32) { - semacquireimpl(addr, 1); + runtime·semacquire(addr, true); } func runtime_Semrelease(addr *uint32) {