mirror of
https://github.com/golang/go
synced 2024-11-17 13:35:08 -07:00
runtime: speed up receive on empty closed channel
Currently, nonblocking receive on an open channel is about 700 times faster than nonblocking receive on a closed channel. This change makes closed channels equally fast. Fixes #32529. Includes a correction based on #36714. relevant benchstat output: name old time/op new time/op delta MakeChan/Byte-40 140ns ± 4% 137ns ± 7% -2.38% (p=0.023 n=17+19) MakeChan/Int-40 174ns ± 5% 173ns ± 6% ~ (p=0.437 n=18+19) MakeChan/Ptr-40 315ns ±15% 301ns ±15% ~ (p=0.051 n=20+20) MakeChan/Struct/0-40 123ns ± 8% 99ns ±11% -19.18% (p=0.000 n=20+17) MakeChan/Struct/32-40 297ns ± 8% 241ns ±18% -19.13% (p=0.000 n=20+20) MakeChan/Struct/40-40 344ns ± 5% 273ns ±23% -20.49% (p=0.000 n=20+20) ChanNonblocking-40 0.32ns ± 2% 0.32ns ± 2% -1.25% (p=0.000 n=19+18) SelectUncontended-40 5.72ns ± 1% 5.71ns ± 2% ~ (p=0.326 n=19+19) SelectSyncContended-40 10.9µs ±10% 10.6µs ± 3% -2.77% (p=0.009 n=20+16) SelectAsyncContended-40 1.00µs ± 0% 1.10µs ± 0% +10.75% (p=0.000 n=18+19) SelectNonblock-40 1.22ns ± 2% 1.21ns ± 4% ~ (p=0.141 n=18+19) ChanUncontended-40 240ns ± 4% 233ns ± 4% -2.82% (p=0.000 n=20+20) ChanContended-40 86.7µs ± 0% 82.7µs ± 0% -4.64% (p=0.000 n=20+19) ChanSync-40 294ns ± 7% 284ns ± 9% -3.44% (p=0.006 n=20+20) ChanSyncWork-40 38.4µs ±19% 34.0µs ± 4% -11.33% (p=0.000 n=20+18) ChanProdCons0-40 1.50µs ± 1% 1.63µs ± 0% +8.53% (p=0.000 n=19+19) ChanProdCons10-40 1.17µs ± 0% 1.18µs ± 1% +0.44% (p=0.000 n=19+20) ChanProdCons100-40 985ns ± 0% 959ns ± 1% -2.64% (p=0.000 n=20+20) ChanProdConsWork0-40 1.50µs ± 0% 1.60µs ± 2% +6.54% (p=0.000 n=18+20) ChanProdConsWork10-40 1.26µs ± 0% 1.26µs ± 2% +0.40% (p=0.015 n=20+19) ChanProdConsWork100-40 1.27µs ± 0% 1.22µs ± 0% -4.15% (p=0.000 n=20+19) SelectProdCons-40 1.50µs ± 1% 1.53µs ± 1% +1.95% (p=0.000 n=20+20) ChanCreation-40 82.1ns ± 5% 81.6ns ± 7% ~ (p=0.483 n=19+19) ChanSem-40 877ns ± 0% 719ns ± 0% -17.98% (p=0.000 n=18+19) ChanPopular-40 1.75ms ± 2% 1.78ms ± 3% +1.76% (p=0.002 n=20+19) ChanClosed-40 215ns ± 1% 0ns ± 6% -99.82% (p=0.000 n=20+18) Previously committed in CL 181543 and reverted in CL 216158. Change-Id: Ib767b08d724cfad03598d77271dbc1087485feb8 Reviewed-on: https://go-review.googlesource.com/c/go/+/216818 Run-TryBot: Ian Lance Taylor <iant@golang.org> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: Keith Randall <khr@golang.org>
This commit is contained in:
parent
9a2db7c41b
commit
02e492f1a3
@ -121,6 +121,21 @@ func chanbuf(c *hchan, i uint) unsafe.Pointer {
|
||||
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
|
||||
}
|
||||
|
||||
// full reports whether a send on c would block (that is, the channel is full).
|
||||
// It uses a single word-sized read of mutable state, so although
|
||||
// the answer is instantaneously true, the correct answer may have changed
|
||||
// by the time the calling function receives the return value.
|
||||
func full(c *hchan) bool {
|
||||
// c.dataqsiz is immutable (never written after the channel is created)
|
||||
// so it is safe to read at any time during channel operation.
|
||||
if c.dataqsiz == 0 {
|
||||
// Assumes that a pointer read is relaxed-atomic.
|
||||
return c.recvq.first == nil
|
||||
}
|
||||
// Assumes that a uint read is relaxed-atomic.
|
||||
return c.qcount == c.dataqsiz
|
||||
}
|
||||
|
||||
// entry point for c <- x from compiled code
|
||||
//go:nosplit
|
||||
func chansend1(c *hchan, elem unsafe.Pointer) {
|
||||
@ -160,7 +175,7 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
|
||||
//
|
||||
// After observing that the channel is not closed, we observe that the channel is
|
||||
// not ready for sending. Each of these observations is a single word-sized read
|
||||
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
|
||||
// (first c.closed and second full()).
|
||||
// Because a closed channel cannot transition from 'ready for sending' to
|
||||
// 'not ready for sending', even if the channel is closed between the two observations,
|
||||
// they imply a moment between the two when the channel was both not yet closed
|
||||
@ -169,9 +184,10 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
|
||||
//
|
||||
// It is okay if the reads are reordered here: if we observe that the channel is not
|
||||
// ready for sending and then observe that it is not closed, that implies that the
|
||||
// channel wasn't closed during the first observation.
|
||||
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
|
||||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
|
||||
// channel wasn't closed during the first observation. However, nothing here
|
||||
// guarantees forward progress. We rely on the side effects of lock release in
|
||||
// chanrecv() and closechan() to update this thread's view of c.closed and full().
|
||||
if !block && c.closed == 0 && full(c) {
|
||||
return false
|
||||
}
|
||||
|
||||
@ -401,6 +417,16 @@ func closechan(c *hchan) {
|
||||
}
|
||||
}
|
||||
|
||||
// empty reports whether a read from c would block (that is, the channel is
|
||||
// empty). It uses a single atomic read of mutable state.
|
||||
func empty(c *hchan) bool {
|
||||
// c.dataqsiz is immutable.
|
||||
if c.dataqsiz == 0 {
|
||||
return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
|
||||
}
|
||||
return atomic.Loaduint(&c.qcount) == 0
|
||||
}
|
||||
|
||||
// entry points for <- c from compiled code
|
||||
//go:nosplit
|
||||
func chanrecv1(c *hchan, elem unsafe.Pointer) {
|
||||
@ -436,22 +462,37 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
|
||||
}
|
||||
|
||||
// Fast path: check for failed non-blocking operation without acquiring the lock.
|
||||
if !block && empty(c) {
|
||||
// After observing that the channel is not ready for receiving, we observe whether the
|
||||
// channel is closed.
|
||||
//
|
||||
// After observing that the channel is not ready for receiving, we observe that the
|
||||
// channel is not closed. Each of these observations is a single word-sized read
|
||||
// (first c.sendq.first or c.qcount, and second c.closed).
|
||||
// Reordering of these checks could lead to incorrect behavior when racing with a close.
|
||||
// For example, if the channel was open and not empty, was closed, and then drained,
|
||||
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
|
||||
// we use atomic loads for both checks, and rely on emptying and closing to happen in
|
||||
// separate critical sections under the same lock. This assumption fails when closing
|
||||
// an unbuffered channel with a blocked send, but that is an error condition anyway.
|
||||
if atomic.Load(&c.closed) == 0 {
|
||||
// Because a channel cannot be reopened, the later observation of the channel
|
||||
// being not closed implies that it was also not closed at the moment of the
|
||||
// first observation. We behave as if we observed the channel at that moment
|
||||
// and report that the receive cannot proceed.
|
||||
//
|
||||
// The order of operations is important here: reversing the operations can lead to
|
||||
// incorrect behavior when racing with a close.
|
||||
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
|
||||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
|
||||
atomic.Load(&c.closed) == 0 {
|
||||
return
|
||||
}
|
||||
// The channel is irreversibly closed. Re-check whether the channel has any pending data
|
||||
// to receive, which could have arrived between the empty and closed checks above.
|
||||
// Sequential consistency is also required here, when racing with such a send.
|
||||
if empty(c) {
|
||||
// The channel is irreversibly closed and empty.
|
||||
if raceenabled {
|
||||
raceacquire(c.raceaddr())
|
||||
}
|
||||
if ep != nil {
|
||||
typedmemclr(c.elemtype, ep)
|
||||
}
|
||||
return true, false
|
||||
}
|
||||
}
|
||||
|
||||
var t0 int64
|
||||
if blockprofilerate > 0 {
|
||||
|
@ -1127,6 +1127,20 @@ func BenchmarkChanPopular(b *testing.B) {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func BenchmarkChanClosed(b *testing.B) {
|
||||
c := make(chan struct{})
|
||||
close(c)
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
select {
|
||||
case <-c:
|
||||
default:
|
||||
b.Error("Unreachable")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
var (
|
||||
alwaysFalse = false
|
||||
workSink = 0
|
||||
|
Loading…
Reference in New Issue
Block a user