mirror of
https://github.com/golang/go
synced 2024-09-30 08:08:32 -06:00
runtime: simplify buffered channels.
This change removes the retry mechanism we use for buffered channels. Instead, any sender waking up a receiver or vice versa completes the full protocol with its counterpart. This means the counterpart does not need to relock the channel when it wakes up. (Currently buffered channels need to relock on wakeup.) For sends on a channel with waiting receivers, this change replaces two copies (sender->queue, queue->receiver) with one (sender->receiver). For receives on channels with a waiting sender, two copies are still required. This change unifies to a large degree the algorithm for buffered and unbuffered channels, simplifying the overall implementation. Fixes #11506 benchmark old ns/op new ns/op delta BenchmarkChanProdCons10 125 110 -12.00% BenchmarkChanProdCons0 303 284 -6.27% BenchmarkChanProdCons100 75.5 71.3 -5.56% BenchmarkChanContended 6452 6125 -5.07% BenchmarkChanNonblocking 11.5 11.0 -4.35% BenchmarkChanCreation 149 143 -4.03% BenchmarkChanSem 63.6 61.6 -3.14% BenchmarkChanUncontended 6390 6212 -2.79% BenchmarkChanSync 282 276 -2.13% BenchmarkChanProdConsWork10 516 506 -1.94% BenchmarkChanProdConsWork0 696 685 -1.58% BenchmarkChanProdConsWork100 470 469 -0.21% BenchmarkChanPopular 660427 660012 -0.06% Change-Id: I164113a56432fbc7cace0786e49c5a6e6a708ea4 Reviewed-on: https://go-review.googlesource.com/9345 Run-TryBot: Keith Randall <khr@golang.org> Reviewed-by: Austin Clements <austin@google.com> Reviewed-by: Dmitry Vyukov <dvyukov@google.com>
This commit is contained in:
parent
7bb2a7d63b
commit
8e496f1d69
@ -6,6 +6,11 @@ package runtime
|
||||
|
||||
// This file contains the implementation of Go channels.
|
||||
|
||||
// Invariants:
|
||||
// At least one of c.sendq and c.recvq is empty.
|
||||
// For buffered channels, also:
|
||||
// c.qcount > 0 implies that c.recvq is empty.
|
||||
// c.qcount < c.dataqsiz implies that c.sendq is empty.
|
||||
import "unsafe"
|
||||
|
||||
const (
|
||||
@ -153,28 +158,33 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin
|
||||
}
|
||||
|
||||
lock(&c.lock)
|
||||
|
||||
if c.closed != 0 {
|
||||
unlock(&c.lock)
|
||||
panic("send on closed channel")
|
||||
}
|
||||
|
||||
if c.dataqsiz == 0 { // synchronous channel
|
||||
sg := c.recvq.dequeue()
|
||||
if sg != nil { // found a waiting receiver
|
||||
if raceenabled {
|
||||
racesync(c, sg)
|
||||
if sg := c.recvq.dequeue(); sg != nil {
|
||||
// Found a waiting receiver. We pass the value we want to send
|
||||
// directly to the receiver, bypassing the channel buffer (if any).
|
||||
send(c, sg, ep, func() { unlock(&c.lock) })
|
||||
return true
|
||||
}
|
||||
unlock(&c.lock)
|
||||
|
||||
recvg := sg.g
|
||||
if sg.elem != nil {
|
||||
syncsend(c, sg, ep)
|
||||
if c.qcount < c.dataqsiz {
|
||||
// Space is available in the channel buffer. Enqueue the element to send.
|
||||
qp := chanbuf(c, c.sendx)
|
||||
if raceenabled {
|
||||
raceacquire(qp)
|
||||
racerelease(qp)
|
||||
}
|
||||
recvg.param = unsafe.Pointer(sg)
|
||||
if sg.releasetime != 0 {
|
||||
sg.releasetime = cputicks()
|
||||
typedmemmove(c.elemtype, qp, ep)
|
||||
c.sendx++
|
||||
if c.sendx == c.dataqsiz {
|
||||
c.sendx = 0
|
||||
}
|
||||
goready(recvg, 3)
|
||||
c.qcount++
|
||||
unlock(&c.lock)
|
||||
return true
|
||||
}
|
||||
|
||||
@ -183,7 +193,7 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin
|
||||
return false
|
||||
}
|
||||
|
||||
// no receiver available: block on this channel.
|
||||
// Block on the channel. Some receiver will complete our operation for us.
|
||||
gp := getg()
|
||||
mysg := acquireSudog()
|
||||
mysg.releasetime = 0
|
||||
@ -192,16 +202,16 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin
|
||||
}
|
||||
mysg.elem = ep
|
||||
mysg.waitlink = nil
|
||||
gp.waiting = mysg
|
||||
mysg.g = gp
|
||||
mysg.selectdone = nil
|
||||
gp.waiting = mysg
|
||||
gp.param = nil
|
||||
c.sendq.enqueue(mysg)
|
||||
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
|
||||
|
||||
// someone woke us up.
|
||||
if mysg != gp.waiting {
|
||||
throw("G waiting list is corrupted!")
|
||||
throw("G waiting list is corrupted")
|
||||
}
|
||||
gp.waiting = nil
|
||||
if gp.param == nil {
|
||||
@ -216,72 +226,49 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin
|
||||
}
|
||||
releaseSudog(mysg)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// asynchronous channel
|
||||
// wait for some space to write our data
|
||||
var t1 int64
|
||||
for futile := byte(0); c.qcount >= c.dataqsiz; futile = traceFutileWakeup {
|
||||
if !block {
|
||||
unlock(&c.lock)
|
||||
return false
|
||||
}
|
||||
gp := getg()
|
||||
mysg := acquireSudog()
|
||||
mysg.releasetime = 0
|
||||
if t0 != 0 {
|
||||
mysg.releasetime = -1
|
||||
}
|
||||
mysg.g = gp
|
||||
mysg.elem = nil
|
||||
mysg.selectdone = nil
|
||||
c.sendq.enqueue(mysg)
|
||||
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend|futile, 3)
|
||||
|
||||
// someone woke us up - try again
|
||||
if mysg.releasetime > 0 {
|
||||
t1 = mysg.releasetime
|
||||
}
|
||||
releaseSudog(mysg)
|
||||
lock(&c.lock)
|
||||
if c.closed != 0 {
|
||||
unlock(&c.lock)
|
||||
panic("send on closed channel")
|
||||
}
|
||||
}
|
||||
|
||||
// write our data into the channel buffer
|
||||
// send processes a send operation on an empty channel c.
|
||||
// The value ep sent by the sender is copied to the receiver sg.
|
||||
// The receiver is then woken up to go on its merry way.
|
||||
// Channel c must be empty and locked. send unlocks c with unlockf.
|
||||
// sg must already be dequeued from c.
|
||||
// ep must be non-nil and point to the heap or the caller's stack.
|
||||
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {
|
||||
if raceenabled {
|
||||
raceacquire(chanbuf(c, c.sendx))
|
||||
racerelease(chanbuf(c, c.sendx))
|
||||
if c.dataqsiz == 0 {
|
||||
racesync(c, sg)
|
||||
} else {
|
||||
// Pretend we go through the buffer, even though
|
||||
// we copy directly. Note that we need to increment
|
||||
// the head/tail locations only when raceenabled.
|
||||
qp := chanbuf(c, c.recvx)
|
||||
raceacquire(qp)
|
||||
racerelease(qp)
|
||||
raceacquireg(sg.g, qp)
|
||||
racereleaseg(sg.g, qp)
|
||||
c.recvx++
|
||||
if c.recvx == c.dataqsiz {
|
||||
c.recvx = 0
|
||||
}
|
||||
typedmemmove(c.elemtype, chanbuf(c, c.sendx), ep)
|
||||
c.sendx++
|
||||
if c.sendx == c.dataqsiz {
|
||||
c.sendx = 0
|
||||
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
|
||||
}
|
||||
c.qcount++
|
||||
|
||||
// wake up a waiting receiver
|
||||
sg := c.recvq.dequeue()
|
||||
if sg != nil {
|
||||
recvg := sg.g
|
||||
unlock(&c.lock)
|
||||
}
|
||||
unlockf()
|
||||
if sg.elem != nil {
|
||||
sendDirect(c.elemtype, sg.elem, ep)
|
||||
sg.elem = nil
|
||||
}
|
||||
gp := sg.g
|
||||
gp.param = unsafe.Pointer(sg)
|
||||
if sg.releasetime != 0 {
|
||||
sg.releasetime = cputicks()
|
||||
}
|
||||
goready(recvg, 3)
|
||||
} else {
|
||||
unlock(&c.lock)
|
||||
}
|
||||
if t1 > 0 {
|
||||
blockevent(t1-t0, 2)
|
||||
}
|
||||
return true
|
||||
goready(gp, 4)
|
||||
}
|
||||
|
||||
func syncsend(c *hchan, sg *sudog, elem unsafe.Pointer) {
|
||||
// Send on unbuffered channel is the only operation
|
||||
func sendDirect(t *_type, dst, src unsafe.Pointer) {
|
||||
// Send on an unbuffered or empty-buffered channel is the only operation
|
||||
// in the entire runtime where one goroutine
|
||||
// writes to the stack of another goroutine. The GC assumes that
|
||||
// stack writes only happen when the goroutine is running and are
|
||||
@ -290,9 +277,8 @@ func syncsend(c *hchan, sg *sudog, elem unsafe.Pointer) {
|
||||
// typedmemmove will call heapBitsBulkBarrier, but the target bytes
|
||||
// are not in the heap, so that will not help. We arrange to call
|
||||
// memmove and typeBitsBulkBarrier instead.
|
||||
memmove(sg.elem, elem, c.elemtype.size)
|
||||
typeBitsBulkBarrier(c.elemtype, uintptr(sg.elem), c.elemtype.size)
|
||||
sg.elem = nil
|
||||
memmove(dst, src, t.size)
|
||||
typeBitsBulkBarrier(t, uintptr(dst), t.size)
|
||||
}
|
||||
|
||||
func closechan(c *hchan) {
|
||||
@ -320,27 +306,36 @@ func closechan(c *hchan) {
|
||||
if sg == nil {
|
||||
break
|
||||
}
|
||||
gp := sg.g
|
||||
if sg.elem != nil {
|
||||
memclr(sg.elem, uintptr(c.elemsize))
|
||||
sg.elem = nil
|
||||
gp.param = nil
|
||||
}
|
||||
if sg.releasetime != 0 {
|
||||
sg.releasetime = cputicks()
|
||||
}
|
||||
gp := sg.g
|
||||
gp.param = nil
|
||||
if raceenabled {
|
||||
raceacquireg(gp, unsafe.Pointer(c))
|
||||
}
|
||||
goready(gp, 3)
|
||||
}
|
||||
|
||||
// release all writers
|
||||
// release all writers (they will panic)
|
||||
for {
|
||||
sg := c.sendq.dequeue()
|
||||
if sg == nil {
|
||||
break
|
||||
}
|
||||
gp := sg.g
|
||||
sg.elem = nil
|
||||
gp.param = nil
|
||||
if sg.releasetime != 0 {
|
||||
sg.releasetime = cputicks()
|
||||
}
|
||||
gp := sg.g
|
||||
gp.param = nil
|
||||
if raceenabled {
|
||||
raceacquireg(gp, unsafe.Pointer(c))
|
||||
}
|
||||
goready(gp, 3)
|
||||
}
|
||||
unlock(&c.lock)
|
||||
@ -363,8 +358,10 @@ func chanrecv2(t *chantype, c *hchan, elem unsafe.Pointer) (received bool) {
|
||||
// If block == false and no elements are available, returns (false, false).
|
||||
// Otherwise, if c is closed, zeros *ep and returns (true, false).
|
||||
// Otherwise, fills in *ep with an element and returns (true, true).
|
||||
// A non-nil ep must point to the heap or the caller's stack.
|
||||
func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
|
||||
// raceenabled: don't need to check ep, as it is always on the stack.
|
||||
// raceenabled: don't need to check ep, as it is always on the stack
|
||||
// or is new memory allocated by reflect.
|
||||
|
||||
if debugChan {
|
||||
print("chanrecv: chan=", c, "\n")
|
||||
@ -402,36 +399,50 @@ func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, r
|
||||
}
|
||||
|
||||
lock(&c.lock)
|
||||
if c.dataqsiz == 0 { // synchronous channel
|
||||
if c.closed != 0 {
|
||||
return recvclosed(c, ep)
|
||||
}
|
||||
|
||||
sg := c.sendq.dequeue()
|
||||
if sg != nil {
|
||||
if c.closed != 0 && c.qcount == 0 {
|
||||
if raceenabled {
|
||||
racesync(c, sg)
|
||||
raceacquire(unsafe.Pointer(c))
|
||||
}
|
||||
unlock(&c.lock)
|
||||
|
||||
if ep != nil {
|
||||
typedmemmove(c.elemtype, ep, sg.elem)
|
||||
memclr(ep, uintptr(c.elemsize))
|
||||
}
|
||||
sg.elem = nil
|
||||
gp := sg.g
|
||||
gp.param = unsafe.Pointer(sg)
|
||||
if sg.releasetime != 0 {
|
||||
sg.releasetime = cputicks()
|
||||
return true, false
|
||||
}
|
||||
goready(gp, 3)
|
||||
selected = true
|
||||
received = true
|
||||
return
|
||||
|
||||
if sg := c.sendq.dequeue(); sg != nil {
|
||||
// Found a waiting sender. If buffer is size 0, receive value
|
||||
// directly from sender. Otherwise, recieve from head of queue
|
||||
// and add sender's value to the tail of the queue (both map to
|
||||
// the same buffer slot because the queue is full).
|
||||
recv(c, sg, ep, func() { unlock(&c.lock) })
|
||||
return true, true
|
||||
}
|
||||
|
||||
if c.qcount > 0 {
|
||||
// Receive directly from queue
|
||||
qp := chanbuf(c, c.recvx)
|
||||
if raceenabled {
|
||||
raceacquire(qp)
|
||||
racerelease(qp)
|
||||
}
|
||||
if ep != nil {
|
||||
typedmemmove(c.elemtype, ep, qp)
|
||||
}
|
||||
memclr(qp, uintptr(c.elemsize))
|
||||
c.recvx++
|
||||
if c.recvx == c.dataqsiz {
|
||||
c.recvx = 0
|
||||
}
|
||||
c.qcount--
|
||||
unlock(&c.lock)
|
||||
return true, true
|
||||
}
|
||||
|
||||
if !block {
|
||||
unlock(&c.lock)
|
||||
return
|
||||
return false, false
|
||||
}
|
||||
|
||||
// no sender available: block on this channel.
|
||||
@ -452,117 +463,75 @@ func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, r
|
||||
|
||||
// someone woke us up
|
||||
if mysg != gp.waiting {
|
||||
throw("G waiting list is corrupted!")
|
||||
throw("G waiting list is corrupted")
|
||||
}
|
||||
gp.waiting = nil
|
||||
if mysg.releasetime > 0 {
|
||||
blockevent(mysg.releasetime-t0, 2)
|
||||
}
|
||||
haveData := gp.param != nil
|
||||
closed := gp.param == nil
|
||||
gp.param = nil
|
||||
releaseSudog(mysg)
|
||||
return true, !closed
|
||||
}
|
||||
|
||||
if haveData {
|
||||
// a sender sent us some data. It already wrote to ep.
|
||||
selected = true
|
||||
received = true
|
||||
return
|
||||
}
|
||||
|
||||
lock(&c.lock)
|
||||
if c.closed == 0 {
|
||||
throw("chanrecv: spurious wakeup")
|
||||
}
|
||||
return recvclosed(c, ep)
|
||||
}
|
||||
|
||||
// asynchronous channel
|
||||
// wait for some data to appear
|
||||
var t1 int64
|
||||
for futile := byte(0); c.qcount <= 0; futile = traceFutileWakeup {
|
||||
if c.closed != 0 {
|
||||
selected, received = recvclosed(c, ep)
|
||||
if t1 > 0 {
|
||||
blockevent(t1-t0, 2)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if !block {
|
||||
unlock(&c.lock)
|
||||
return
|
||||
}
|
||||
|
||||
// wait for someone to send an element
|
||||
gp := getg()
|
||||
mysg := acquireSudog()
|
||||
mysg.releasetime = 0
|
||||
if t0 != 0 {
|
||||
mysg.releasetime = -1
|
||||
}
|
||||
mysg.elem = nil
|
||||
mysg.g = gp
|
||||
mysg.selectdone = nil
|
||||
|
||||
c.recvq.enqueue(mysg)
|
||||
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv|futile, 3)
|
||||
|
||||
// someone woke us up - try again
|
||||
if mysg.releasetime > 0 {
|
||||
t1 = mysg.releasetime
|
||||
}
|
||||
releaseSudog(mysg)
|
||||
lock(&c.lock)
|
||||
}
|
||||
|
||||
// recv processes a receive operation on a full channel c.
|
||||
// There are 2 parts:
|
||||
// 1) The value sent by the sender sg is put into the channel
|
||||
// and the sender is woken up to go on its merry way.
|
||||
// 2) The value received by the receiver (the current G) is
|
||||
// written to ep.
|
||||
// For synchronous channels, both values are the same.
|
||||
// For asynchronous channels, the receiver gets its data from
|
||||
// the channel buffer and the sender's data is put in the
|
||||
// channel buffer.
|
||||
// Channel c must be full and locked. recv unlocks c with unlockf.
|
||||
// sg must already be dequeued from c.
|
||||
// A non-nil ep must point to the heap or the caller's stack.
|
||||
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {
|
||||
if c.dataqsiz == 0 {
|
||||
if raceenabled {
|
||||
raceacquire(chanbuf(c, c.recvx))
|
||||
racerelease(chanbuf(c, c.recvx))
|
||||
racesync(c, sg)
|
||||
}
|
||||
unlockf()
|
||||
if ep != nil {
|
||||
typedmemmove(c.elemtype, ep, chanbuf(c, c.recvx))
|
||||
// copy data from sender
|
||||
// ep points to our own stack or heap, so nothing
|
||||
// special (ala sendDirect) needed here.
|
||||
typedmemmove(c.elemtype, ep, sg.elem)
|
||||
}
|
||||
memclr(chanbuf(c, c.recvx), uintptr(c.elemsize))
|
||||
|
||||
} else {
|
||||
// Queue is full. Take the item at the
|
||||
// head of the queue. Make the sender enqueue
|
||||
// its item at the tail of the queue. Since the
|
||||
// queue is full, those are both the same slot.
|
||||
qp := chanbuf(c, c.recvx)
|
||||
if raceenabled {
|
||||
raceacquire(qp)
|
||||
racerelease(qp)
|
||||
raceacquireg(sg.g, qp)
|
||||
racereleaseg(sg.g, qp)
|
||||
}
|
||||
// copy data from queue to receiver
|
||||
if ep != nil {
|
||||
typedmemmove(c.elemtype, ep, qp)
|
||||
}
|
||||
// copy data from sender to queue
|
||||
typedmemmove(c.elemtype, qp, sg.elem)
|
||||
c.recvx++
|
||||
if c.recvx == c.dataqsiz {
|
||||
c.recvx = 0
|
||||
}
|
||||
c.qcount--
|
||||
|
||||
// ping a sender now that there is space
|
||||
sg := c.sendq.dequeue()
|
||||
if sg != nil {
|
||||
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
|
||||
unlockf()
|
||||
}
|
||||
sg.elem = nil
|
||||
gp := sg.g
|
||||
unlock(&c.lock)
|
||||
gp.param = unsafe.Pointer(sg)
|
||||
if sg.releasetime != 0 {
|
||||
sg.releasetime = cputicks()
|
||||
}
|
||||
goready(gp, 3)
|
||||
} else {
|
||||
unlock(&c.lock)
|
||||
}
|
||||
|
||||
if t1 > 0 {
|
||||
blockevent(t1-t0, 2)
|
||||
}
|
||||
selected = true
|
||||
received = true
|
||||
return
|
||||
}
|
||||
|
||||
// recvclosed is a helper function for chanrecv. Handles cleanup
|
||||
// when the receiver encounters a closed channel.
|
||||
// Caller must hold c.lock, recvclosed will release the lock.
|
||||
func recvclosed(c *hchan, ep unsafe.Pointer) (selected, recevied bool) {
|
||||
if raceenabled {
|
||||
raceacquire(unsafe.Pointer(c))
|
||||
}
|
||||
unlock(&c.lock)
|
||||
if ep != nil {
|
||||
memclr(ep, uintptr(c.elemsize))
|
||||
}
|
||||
return true, false
|
||||
goready(gp, 4)
|
||||
}
|
||||
|
||||
// compiler implements
|
||||
|
@ -304,7 +304,7 @@ func selectgoImpl(sel *hselect) (uintptr, uint16) {
|
||||
k *scase
|
||||
sglist *sudog
|
||||
sgnext *sudog
|
||||
futile byte
|
||||
qp unsafe.Pointer
|
||||
)
|
||||
|
||||
loop:
|
||||
@ -317,15 +317,12 @@ loop:
|
||||
|
||||
switch cas.kind {
|
||||
case caseRecv:
|
||||
if c.dataqsiz > 0 {
|
||||
if c.qcount > 0 {
|
||||
goto asyncrecv
|
||||
}
|
||||
} else {
|
||||
sg = c.sendq.dequeue()
|
||||
if sg != nil {
|
||||
goto syncrecv
|
||||
goto recv
|
||||
}
|
||||
if c.qcount > 0 {
|
||||
goto bufrecv
|
||||
}
|
||||
if c.closed != 0 {
|
||||
goto rclose
|
||||
@ -338,15 +335,12 @@ loop:
|
||||
if c.closed != 0 {
|
||||
goto sclose
|
||||
}
|
||||
if c.dataqsiz > 0 {
|
||||
if c.qcount < c.dataqsiz {
|
||||
goto asyncsend
|
||||
}
|
||||
} else {
|
||||
sg = c.recvq.dequeue()
|
||||
if sg != nil {
|
||||
goto syncsend
|
||||
goto send
|
||||
}
|
||||
if c.qcount < c.dataqsiz {
|
||||
goto bufsend
|
||||
}
|
||||
|
||||
case caseDefault:
|
||||
@ -363,6 +357,9 @@ loop:
|
||||
// pass 2 - enqueue on all chans
|
||||
gp = getg()
|
||||
done = 0
|
||||
if gp.waiting != nil {
|
||||
throw("gp.waiting != nil")
|
||||
}
|
||||
for i := 0; i < int(sel.ncase); i++ {
|
||||
cas = &scases[pollorder[i]]
|
||||
c = cas.c
|
||||
@ -389,7 +386,7 @@ loop:
|
||||
|
||||
// wait for someone to wake us up
|
||||
gp.param = nil
|
||||
gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect|futile, 2)
|
||||
gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect, 2)
|
||||
|
||||
// someone woke us up
|
||||
sellock(sel)
|
||||
@ -432,16 +429,13 @@ loop:
|
||||
}
|
||||
|
||||
if cas == nil {
|
||||
futile = traceFutileWakeup
|
||||
// This can happen if we were woken up by a close().
|
||||
// TODO: figure that out explicitly so we don't need this loop.
|
||||
goto loop
|
||||
}
|
||||
|
||||
c = cas.c
|
||||
|
||||
if c.dataqsiz > 0 {
|
||||
throw("selectgo: shouldn't happen")
|
||||
}
|
||||
|
||||
if debugSelect {
|
||||
print("wait-return: sel=", sel, " c=", c, " cas=", cas, " kind=", cas.kind, "\n")
|
||||
}
|
||||
@ -470,7 +464,7 @@ loop:
|
||||
selunlock(sel)
|
||||
goto retc
|
||||
|
||||
asyncrecv:
|
||||
bufrecv:
|
||||
// can receive from buffer
|
||||
if raceenabled {
|
||||
if cas.elem != nil {
|
||||
@ -485,29 +479,20 @@ asyncrecv:
|
||||
if cas.receivedp != nil {
|
||||
*cas.receivedp = true
|
||||
}
|
||||
qp = chanbuf(c, c.recvx)
|
||||
if cas.elem != nil {
|
||||
typedmemmove(c.elemtype, cas.elem, chanbuf(c, c.recvx))
|
||||
typedmemmove(c.elemtype, cas.elem, qp)
|
||||
}
|
||||
memclr(chanbuf(c, c.recvx), uintptr(c.elemsize))
|
||||
memclr(qp, uintptr(c.elemsize))
|
||||
c.recvx++
|
||||
if c.recvx == c.dataqsiz {
|
||||
c.recvx = 0
|
||||
}
|
||||
c.qcount--
|
||||
sg = c.sendq.dequeue()
|
||||
if sg != nil {
|
||||
gp = sg.g
|
||||
selunlock(sel)
|
||||
if sg.releasetime != 0 {
|
||||
sg.releasetime = cputicks()
|
||||
}
|
||||
goready(gp, 3)
|
||||
} else {
|
||||
selunlock(sel)
|
||||
}
|
||||
goto retc
|
||||
|
||||
asyncsend:
|
||||
bufsend:
|
||||
// can send to buffer
|
||||
if raceenabled {
|
||||
raceacquire(chanbuf(c, c.sendx))
|
||||
@ -523,47 +508,18 @@ asyncsend:
|
||||
c.sendx = 0
|
||||
}
|
||||
c.qcount++
|
||||
sg = c.recvq.dequeue()
|
||||
if sg != nil {
|
||||
gp = sg.g
|
||||
selunlock(sel)
|
||||
if sg.releasetime != 0 {
|
||||
sg.releasetime = cputicks()
|
||||
}
|
||||
goready(gp, 3)
|
||||
} else {
|
||||
selunlock(sel)
|
||||
}
|
||||
goto retc
|
||||
|
||||
syncrecv:
|
||||
recv:
|
||||
// can receive from sleeping sender (sg)
|
||||
if raceenabled {
|
||||
if cas.elem != nil {
|
||||
raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
|
||||
}
|
||||
racesync(c, sg)
|
||||
}
|
||||
if msanenabled && cas.elem != nil {
|
||||
msanwrite(cas.elem, c.elemtype.size)
|
||||
}
|
||||
selunlock(sel)
|
||||
recv(c, sg, cas.elem, func() { selunlock(sel) })
|
||||
if debugSelect {
|
||||
print("syncrecv: sel=", sel, " c=", c, "\n")
|
||||
}
|
||||
if cas.receivedp != nil {
|
||||
*cas.receivedp = true
|
||||
}
|
||||
if cas.elem != nil {
|
||||
typedmemmove(c.elemtype, cas.elem, sg.elem)
|
||||
}
|
||||
sg.elem = nil
|
||||
gp = sg.g
|
||||
gp.param = unsafe.Pointer(sg)
|
||||
if sg.releasetime != 0 {
|
||||
sg.releasetime = cputicks()
|
||||
}
|
||||
goready(gp, 3)
|
||||
goto retc
|
||||
|
||||
rclose:
|
||||
@ -580,29 +536,19 @@ rclose:
|
||||
}
|
||||
goto retc
|
||||
|
||||
syncsend:
|
||||
// can send to sleeping receiver (sg)
|
||||
send:
|
||||
// can send to a sleeping receiver (sg)
|
||||
if raceenabled {
|
||||
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
|
||||
racesync(c, sg)
|
||||
}
|
||||
if msanenabled {
|
||||
msanread(cas.elem, c.elemtype.size)
|
||||
}
|
||||
selunlock(sel)
|
||||
send(c, sg, cas.elem, func() { selunlock(sel) })
|
||||
if debugSelect {
|
||||
print("syncsend: sel=", sel, " c=", c, "\n")
|
||||
}
|
||||
if sg.elem != nil {
|
||||
syncsend(c, sg, cas.elem)
|
||||
}
|
||||
sg.elem = nil
|
||||
gp = sg.g
|
||||
gp.param = unsafe.Pointer(sg)
|
||||
if sg.releasetime != 0 {
|
||||
sg.releasetime = cputicks()
|
||||
}
|
||||
goready(gp, 3)
|
||||
goto retc
|
||||
|
||||
retc:
|
||||
if cas.releasetime > 0 {
|
||||
|
Loading…
Reference in New Issue
Block a user