1
0
mirror of https://github.com/golang/go synced 2024-11-18 03:04:45 -07:00

Revert "runtime: simplify buffered channels."

Revert for now until #13169 is understood.

This reverts commit 8e496f1d69.

Change-Id: Ib3eb2588824ef47a2b6eb9e377a24e5c817fcc81
Reviewed-on: https://go-review.googlesource.com/16716
Reviewed-by: Keith Randall <khr@golang.org>
This commit is contained in:
Keith Randall 2015-11-06 08:29:53 +00:00
parent 26263354a3
commit e9f90ba246
2 changed files with 358 additions and 273 deletions

View File

@ -6,11 +6,6 @@ package runtime
// This file contains the implementation of Go channels. // This file contains the implementation of Go channels.
// Invariants:
// At least one of c.sendq and c.recvq is empty.
// For buffered channels, also:
// c.qcount > 0 implies that c.recvq is empty.
// c.qcount < c.dataqsiz implies that c.sendq is empty.
import "unsafe" import "unsafe"
const ( const (
@ -158,117 +153,135 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin
} }
lock(&c.lock) lock(&c.lock)
if c.closed != 0 { if c.closed != 0 {
unlock(&c.lock) unlock(&c.lock)
panic("send on closed channel") panic("send on closed channel")
} }
if sg := c.recvq.dequeue(); sg != nil { if c.dataqsiz == 0 { // synchronous channel
// Found a waiting receiver. We pass the value we want to send sg := c.recvq.dequeue()
// directly to the receiver, bypassing the channel buffer (if any). if sg != nil { // found a waiting receiver
send(c, sg, ep, func() { unlock(&c.lock) }) if raceenabled {
racesync(c, sg)
}
unlock(&c.lock)
recvg := sg.g
if sg.elem != nil {
syncsend(c, sg, ep)
}
recvg.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(recvg, 3)
return true
}
if !block {
unlock(&c.lock)
return false
}
// no receiver available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted!")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic("send on closed channel")
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(int64(mysg.releasetime)-t0, 2)
}
releaseSudog(mysg)
return true return true
} }
if c.qcount < c.dataqsiz { // asynchronous channel
// Space is available in the channel buffer. Enqueue the element to send. // wait for some space to write our data
qp := chanbuf(c, c.sendx) var t1 int64
if raceenabled { for futile := byte(0); c.qcount >= c.dataqsiz; futile = traceFutileWakeup {
raceacquire(qp) if !block {
racerelease(qp) unlock(&c.lock)
return false
} }
typedmemmove(c.elemtype, qp, ep) gp := getg()
c.sendx++ mysg := acquireSudog()
if c.sendx == c.dataqsiz { mysg.releasetime = 0
c.sendx = 0 if t0 != 0 {
mysg.releasetime = -1
} }
c.qcount++ mysg.g = gp
mysg.elem = nil
mysg.selectdone = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend|futile, 3)
// someone woke us up - try again
if mysg.releasetime > 0 {
t1 = mysg.releasetime
}
releaseSudog(mysg)
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic("send on closed channel")
}
}
// write our data into the channel buffer
if raceenabled {
raceacquire(chanbuf(c, c.sendx))
racerelease(chanbuf(c, c.sendx))
}
typedmemmove(c.elemtype, chanbuf(c, c.sendx), ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
// wake up a waiting receiver
sg := c.recvq.dequeue()
if sg != nil {
recvg := sg.g
unlock(&c.lock) unlock(&c.lock)
return true if sg.releasetime != 0 {
} sg.releasetime = cputicks()
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
} }
panic("send on closed channel") goready(recvg, 3)
} else {
unlock(&c.lock)
} }
gp.param = nil if t1 > 0 {
if mysg.releasetime > 0 { blockevent(t1-t0, 2)
blockevent(int64(mysg.releasetime)-t0, 2)
} }
releaseSudog(mysg)
return true return true
} }
// send processes a send operation on an empty channel c. func syncsend(c *hchan, sg *sudog, elem unsafe.Pointer) {
// The value ep sent by the sender is copied to the receiver sg. // Send on unbuffered channel is the only operation
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked. send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
unlockf()
if sg.elem != nil {
sendDirect(c.elemtype, sg.elem, ep)
sg.elem = nil
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, 4)
}
func sendDirect(t *_type, dst, src unsafe.Pointer) {
// Send on an unbuffered or empty-buffered channel is the only operation
// in the entire runtime where one goroutine // in the entire runtime where one goroutine
// writes to the stack of another goroutine. The GC assumes that // writes to the stack of another goroutine. The GC assumes that
// stack writes only happen when the goroutine is running and are // stack writes only happen when the goroutine is running and are
@ -277,8 +290,9 @@ func sendDirect(t *_type, dst, src unsafe.Pointer) {
// typedmemmove will call heapBitsBulkBarrier, but the target bytes // typedmemmove will call heapBitsBulkBarrier, but the target bytes
// are not in the heap, so that will not help. We arrange to call // are not in the heap, so that will not help. We arrange to call
// memmove and typeBitsBulkBarrier instead. // memmove and typeBitsBulkBarrier instead.
memmove(dst, src, t.size) memmove(sg.elem, elem, c.elemtype.size)
typeBitsBulkBarrier(t, uintptr(dst), t.size) typeBitsBulkBarrier(c.elemtype, uintptr(sg.elem), c.elemtype.size)
sg.elem = nil
} }
func closechan(c *hchan) { func closechan(c *hchan) {
@ -306,36 +320,27 @@ func closechan(c *hchan) {
if sg == nil { if sg == nil {
break break
} }
if sg.elem != nil { gp := sg.g
memclr(sg.elem, uintptr(c.elemsize)) sg.elem = nil
sg.elem = nil gp.param = nil
}
if sg.releasetime != 0 { if sg.releasetime != 0 {
sg.releasetime = cputicks() sg.releasetime = cputicks()
} }
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
goready(gp, 3) goready(gp, 3)
} }
// release all writers (they will panic) // release all writers
for { for {
sg := c.sendq.dequeue() sg := c.sendq.dequeue()
if sg == nil { if sg == nil {
break break
} }
gp := sg.g
sg.elem = nil sg.elem = nil
gp.param = nil
if sg.releasetime != 0 { if sg.releasetime != 0 {
sg.releasetime = cputicks() sg.releasetime = cputicks()
} }
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
goready(gp, 3) goready(gp, 3)
} }
unlock(&c.lock) unlock(&c.lock)
@ -358,10 +363,8 @@ func chanrecv2(t *chantype, c *hchan, elem unsafe.Pointer) (received bool) {
// If block == false and no elements are available, returns (false, false). // If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false). // Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true). // Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack // raceenabled: don't need to check ep, as it is always on the stack.
// or is new memory allocated by reflect.
if debugChan { if debugChan {
print("chanrecv: chan=", c, "\n") print("chanrecv: chan=", c, "\n")
@ -399,139 +402,167 @@ func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, r
} }
lock(&c.lock) lock(&c.lock)
if c.dataqsiz == 0 { // synchronous channel
if c.closed != 0 && c.qcount == 0 { if c.closed != 0 {
if raceenabled { return recvclosed(c, ep)
raceacquire(unsafe.Pointer(c))
} }
sg := c.sendq.dequeue()
if sg != nil {
if raceenabled {
racesync(c, sg)
}
unlock(&c.lock)
if ep != nil {
typedmemmove(c.elemtype, ep, sg.elem)
}
sg.elem = nil
gp := sg.g
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, 3)
selected = true
received = true
return
}
if !block {
unlock(&c.lock)
return
}
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted!")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
haveData := gp.param != nil
gp.param = nil
releaseSudog(mysg)
if haveData {
// a sender sent us some data. It already wrote to ep.
selected = true
received = true
return
}
lock(&c.lock)
if c.closed == 0 {
throw("chanrecv: spurious wakeup")
}
return recvclosed(c, ep)
}
// asynchronous channel
// wait for some data to appear
var t1 int64
for futile := byte(0); c.qcount <= 0; futile = traceFutileWakeup {
if c.closed != 0 {
selected, received = recvclosed(c, ep)
if t1 > 0 {
blockevent(t1-t0, 2)
}
return
}
if !block {
unlock(&c.lock)
return
}
// wait for someone to send an element
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = nil
mysg.g = gp
mysg.selectdone = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv|futile, 3)
// someone woke us up - try again
if mysg.releasetime > 0 {
t1 = mysg.releasetime
}
releaseSudog(mysg)
lock(&c.lock)
}
if raceenabled {
raceacquire(chanbuf(c, c.recvx))
racerelease(chanbuf(c, c.recvx))
}
if ep != nil {
typedmemmove(c.elemtype, ep, chanbuf(c, c.recvx))
}
memclr(chanbuf(c, c.recvx), uintptr(c.elemsize))
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
// ping a sender now that there is space
sg := c.sendq.dequeue()
if sg != nil {
gp := sg.g
unlock(&c.lock) unlock(&c.lock)
if ep != nil { if sg.releasetime != 0 {
memclr(ep, uintptr(c.elemsize)) sg.releasetime = cputicks()
} }
return true, false goready(gp, 3)
} } else {
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, recieve from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) })
return true, true
}
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
memclr(qp, uintptr(c.elemsize))
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock) unlock(&c.lock)
return true, true
} }
if !block { if t1 > 0 {
unlock(&c.lock) blockevent(t1-t0, 2)
return false, false
} }
selected = true
// no sender available: block on this channel. received = true
gp := getg() return
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
releaseSudog(mysg)
return true, !closed
} }
// recv processes a receive operation on a full channel c. // recvclosed is a helper function for chanrecv. Handles cleanup
// There are 2 parts: // when the receiver encounters a closed channel.
// 1) The value sent by the sender sg is put into the channel // Caller must hold c.lock, recvclosed will release the lock.
// and the sender is woken up to go on its merry way. func recvclosed(c *hchan, ep unsafe.Pointer) (selected, recevied bool) {
// 2) The value received by the receiver (the current G) is if raceenabled {
// written to ep. raceacquire(unsafe.Pointer(c))
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
unlockf()
if ep != nil {
// copy data from sender
// ep points to our own stack or heap, so nothing
// special (ala sendDirect) needed here.
typedmemmove(c.elemtype, ep, sg.elem)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
unlockf()
} }
sg.elem = nil unlock(&c.lock)
gp := sg.g if ep != nil {
gp.param = unsafe.Pointer(sg) memclr(ep, uintptr(c.elemsize))
if sg.releasetime != 0 {
sg.releasetime = cputicks()
} }
goready(gp, 4) return true, false
} }
// compiler implements // compiler implements

View File

@ -304,7 +304,7 @@ func selectgoImpl(sel *hselect) (uintptr, uint16) {
k *scase k *scase
sglist *sudog sglist *sudog
sgnext *sudog sgnext *sudog
qp unsafe.Pointer futile byte
) )
loop: loop:
@ -317,12 +317,15 @@ loop:
switch cas.kind { switch cas.kind {
case caseRecv: case caseRecv:
sg = c.sendq.dequeue() if c.dataqsiz > 0 {
if sg != nil { if c.qcount > 0 {
goto recv goto asyncrecv
} }
if c.qcount > 0 { } else {
goto bufrecv sg = c.sendq.dequeue()
if sg != nil {
goto syncrecv
}
} }
if c.closed != 0 { if c.closed != 0 {
goto rclose goto rclose
@ -335,12 +338,15 @@ loop:
if c.closed != 0 { if c.closed != 0 {
goto sclose goto sclose
} }
sg = c.recvq.dequeue() if c.dataqsiz > 0 {
if sg != nil { if c.qcount < c.dataqsiz {
goto send goto asyncsend
} }
if c.qcount < c.dataqsiz { } else {
goto bufsend sg = c.recvq.dequeue()
if sg != nil {
goto syncsend
}
} }
case caseDefault: case caseDefault:
@ -357,9 +363,6 @@ loop:
// pass 2 - enqueue on all chans // pass 2 - enqueue on all chans
gp = getg() gp = getg()
done = 0 done = 0
if gp.waiting != nil {
throw("gp.waiting != nil")
}
for i := 0; i < int(sel.ncase); i++ { for i := 0; i < int(sel.ncase); i++ {
cas = &scases[pollorder[i]] cas = &scases[pollorder[i]]
c = cas.c c = cas.c
@ -386,7 +389,7 @@ loop:
// wait for someone to wake us up // wait for someone to wake us up
gp.param = nil gp.param = nil
gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect, 2) gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect|futile, 2)
// someone woke us up // someone woke us up
sellock(sel) sellock(sel)
@ -429,13 +432,16 @@ loop:
} }
if cas == nil { if cas == nil {
// This can happen if we were woken up by a close(). futile = traceFutileWakeup
// TODO: figure that out explicitly so we don't need this loop.
goto loop goto loop
} }
c = cas.c c = cas.c
if c.dataqsiz > 0 {
throw("selectgo: shouldn't happen")
}
if debugSelect { if debugSelect {
print("wait-return: sel=", sel, " c=", c, " cas=", cas, " kind=", cas.kind, "\n") print("wait-return: sel=", sel, " c=", c, " cas=", cas, " kind=", cas.kind, "\n")
} }
@ -464,7 +470,7 @@ loop:
selunlock(sel) selunlock(sel)
goto retc goto retc
bufrecv: asyncrecv:
// can receive from buffer // can receive from buffer
if raceenabled { if raceenabled {
if cas.elem != nil { if cas.elem != nil {
@ -479,20 +485,29 @@ bufrecv:
if cas.receivedp != nil { if cas.receivedp != nil {
*cas.receivedp = true *cas.receivedp = true
} }
qp = chanbuf(c, c.recvx)
if cas.elem != nil { if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp) typedmemmove(c.elemtype, cas.elem, chanbuf(c, c.recvx))
} }
memclr(qp, uintptr(c.elemsize)) memclr(chanbuf(c, c.recvx), uintptr(c.elemsize))
c.recvx++ c.recvx++
if c.recvx == c.dataqsiz { if c.recvx == c.dataqsiz {
c.recvx = 0 c.recvx = 0
} }
c.qcount-- c.qcount--
selunlock(sel) sg = c.sendq.dequeue()
if sg != nil {
gp = sg.g
selunlock(sel)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, 3)
} else {
selunlock(sel)
}
goto retc goto retc
bufsend: asyncsend:
// can send to buffer // can send to buffer
if raceenabled { if raceenabled {
raceacquire(chanbuf(c, c.sendx)) raceacquire(chanbuf(c, c.sendx))
@ -508,18 +523,47 @@ bufsend:
c.sendx = 0 c.sendx = 0
} }
c.qcount++ c.qcount++
selunlock(sel) sg = c.recvq.dequeue()
if sg != nil {
gp = sg.g
selunlock(sel)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, 3)
} else {
selunlock(sel)
}
goto retc goto retc
recv: syncrecv:
// can receive from sleeping sender (sg) // can receive from sleeping sender (sg)
recv(c, sg, cas.elem, func() { selunlock(sel) }) if raceenabled {
if cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
}
racesync(c, sg)
}
if msanenabled && cas.elem != nil {
msanwrite(cas.elem, c.elemtype.size)
}
selunlock(sel)
if debugSelect { if debugSelect {
print("syncrecv: sel=", sel, " c=", c, "\n") print("syncrecv: sel=", sel, " c=", c, "\n")
} }
if cas.receivedp != nil { if cas.receivedp != nil {
*cas.receivedp = true *cas.receivedp = true
} }
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, sg.elem)
}
sg.elem = nil
gp = sg.g
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, 3)
goto retc goto retc
rclose: rclose:
@ -536,19 +580,29 @@ rclose:
} }
goto retc goto retc
send: syncsend:
// can send to a sleeping receiver (sg) // can send to sleeping receiver (sg)
if raceenabled { if raceenabled {
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc) raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
racesync(c, sg)
} }
if msanenabled { if msanenabled {
msanread(cas.elem, c.elemtype.size) msanread(cas.elem, c.elemtype.size)
} }
send(c, sg, cas.elem, func() { selunlock(sel) }) selunlock(sel)
if debugSelect { if debugSelect {
print("syncsend: sel=", sel, " c=", c, "\n") print("syncsend: sel=", sel, " c=", c, "\n")
} }
goto retc if sg.elem != nil {
syncsend(c, sg, cas.elem)
}
sg.elem = nil
gp = sg.g
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, 3)
retc: retc:
if cas.releasetime > 0 { if cas.releasetime > 0 {