diff --git a/src/runtime/chan.go b/src/runtime/chan.go index 96ac306624..966e4a9743 100644 --- a/src/runtime/chan.go +++ b/src/runtime/chan.go @@ -6,6 +6,11 @@ package runtime // This file contains the implementation of Go channels. +// Invariants: +// At least one of c.sendq and c.recvq is empty. +// For buffered channels, also: +// c.qcount > 0 implies that c.recvq is empty. +// c.qcount < c.dataqsiz implies that c.sendq is empty. import "unsafe" const ( @@ -153,135 +158,117 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin } lock(&c.lock) + if c.closed != 0 { unlock(&c.lock) panic("send on closed channel") } - if c.dataqsiz == 0 { // synchronous channel - sg := c.recvq.dequeue() - if sg != nil { // found a waiting receiver - if raceenabled { - racesync(c, sg) - } - unlock(&c.lock) - - recvg := sg.g - if sg.elem != nil { - syncsend(c, sg, ep) - } - recvg.param = unsafe.Pointer(sg) - if sg.releasetime != 0 { - sg.releasetime = cputicks() - } - goready(recvg, 3) - return true - } - - if !block { - unlock(&c.lock) - return false - } - - // no receiver available: block on this channel. - gp := getg() - mysg := acquireSudog() - mysg.releasetime = 0 - if t0 != 0 { - mysg.releasetime = -1 - } - mysg.elem = ep - mysg.waitlink = nil - gp.waiting = mysg - mysg.g = gp - mysg.selectdone = nil - gp.param = nil - c.sendq.enqueue(mysg) - goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) - - // someone woke us up. - if mysg != gp.waiting { - throw("G waiting list is corrupted!") - } - gp.waiting = nil - if gp.param == nil { - if c.closed == 0 { - throw("chansend: spurious wakeup") - } - panic("send on closed channel") - } - gp.param = nil - if mysg.releasetime > 0 { - blockevent(int64(mysg.releasetime)-t0, 2) - } - releaseSudog(mysg) + if sg := c.recvq.dequeue(); sg != nil { + // Found a waiting receiver. We pass the value we want to send + // directly to the receiver, bypassing the channel buffer (if any). + send(c, sg, ep, func() { unlock(&c.lock) }) return true } - // asynchronous channel - // wait for some space to write our data - var t1 int64 - for futile := byte(0); c.qcount >= c.dataqsiz; futile = traceFutileWakeup { - if !block { - unlock(&c.lock) - return false + if c.qcount < c.dataqsiz { + // Space is available in the channel buffer. Enqueue the element to send. + qp := chanbuf(c, c.sendx) + if raceenabled { + raceacquire(qp) + racerelease(qp) } - gp := getg() - mysg := acquireSudog() - mysg.releasetime = 0 - if t0 != 0 { - mysg.releasetime = -1 + typedmemmove(c.elemtype, qp, ep) + c.sendx++ + if c.sendx == c.dataqsiz { + c.sendx = 0 } - mysg.g = gp - mysg.elem = nil - mysg.selectdone = nil - c.sendq.enqueue(mysg) - goparkunlock(&c.lock, "chan send", traceEvGoBlockSend|futile, 3) - - // someone woke us up - try again - if mysg.releasetime > 0 { - t1 = mysg.releasetime - } - releaseSudog(mysg) - lock(&c.lock) - if c.closed != 0 { - unlock(&c.lock) - panic("send on closed channel") - } - } - - // write our data into the channel buffer - if raceenabled { - raceacquire(chanbuf(c, c.sendx)) - racerelease(chanbuf(c, c.sendx)) - } - typedmemmove(c.elemtype, chanbuf(c, c.sendx), ep) - c.sendx++ - if c.sendx == c.dataqsiz { - c.sendx = 0 - } - c.qcount++ - - // wake up a waiting receiver - sg := c.recvq.dequeue() - if sg != nil { - recvg := sg.g + c.qcount++ unlock(&c.lock) - if sg.releasetime != 0 { - sg.releasetime = cputicks() - } - goready(recvg, 3) - } else { + return true + } + + if !block { unlock(&c.lock) + return false } - if t1 > 0 { - blockevent(t1-t0, 2) + + // Block on the channel. Some receiver will complete our operation for us. + gp := getg() + mysg := acquireSudog() + mysg.releasetime = 0 + if t0 != 0 { + mysg.releasetime = -1 } + mysg.elem = ep + mysg.waitlink = nil + mysg.g = gp + mysg.selectdone = nil + gp.waiting = mysg + gp.param = nil + c.sendq.enqueue(mysg) + goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) + + // someone woke us up. + if mysg != gp.waiting { + throw("G waiting list is corrupted") + } + gp.waiting = nil + if gp.param == nil { + if c.closed == 0 { + throw("chansend: spurious wakeup") + } + panic("send on closed channel") + } + gp.param = nil + if mysg.releasetime > 0 { + blockevent(int64(mysg.releasetime)-t0, 2) + } + releaseSudog(mysg) return true } -func syncsend(c *hchan, sg *sudog, elem unsafe.Pointer) { - // Send on unbuffered channel is the only operation +// send processes a send operation on an empty channel c. +// The value ep sent by the sender is copied to the receiver sg. +// The receiver is then woken up to go on its merry way. +// Channel c must be empty and locked. send unlocks c with unlockf. +// sg must already be dequeued from c. +// ep must be non-nil and point to the heap or the caller's stack. +func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) { + if raceenabled { + if c.dataqsiz == 0 { + racesync(c, sg) + } else { + // Pretend we go through the buffer, even though + // we copy directly. Note that we need to increment + // the head/tail locations only when raceenabled. + qp := chanbuf(c, c.recvx) + raceacquire(qp) + racerelease(qp) + raceacquireg(sg.g, qp) + racereleaseg(sg.g, qp) + c.recvx++ + if c.recvx == c.dataqsiz { + c.recvx = 0 + } + c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz + } + } + unlockf() + if sg.elem != nil { + sendDirect(c.elemtype, sg.elem, ep) + sg.elem = nil + } + gp := sg.g + gp.param = unsafe.Pointer(sg) + if sg.releasetime != 0 { + sg.releasetime = cputicks() + } + goready(gp, 4) +} + +func sendDirect(t *_type, dst, src unsafe.Pointer) { + // Send on an unbuffered or empty-buffered channel is the only operation // in the entire runtime where one goroutine // writes to the stack of another goroutine. The GC assumes that // stack writes only happen when the goroutine is running and are @@ -290,9 +277,8 @@ func syncsend(c *hchan, sg *sudog, elem unsafe.Pointer) { // typedmemmove will call heapBitsBulkBarrier, but the target bytes // are not in the heap, so that will not help. We arrange to call // memmove and typeBitsBulkBarrier instead. - memmove(sg.elem, elem, c.elemtype.size) - typeBitsBulkBarrier(c.elemtype, uintptr(sg.elem), c.elemtype.size) - sg.elem = nil + memmove(dst, src, t.size) + typeBitsBulkBarrier(t, uintptr(dst), t.size) } func closechan(c *hchan) { @@ -320,27 +306,36 @@ func closechan(c *hchan) { if sg == nil { break } - gp := sg.g - sg.elem = nil - gp.param = nil + if sg.elem != nil { + memclr(sg.elem, uintptr(c.elemsize)) + sg.elem = nil + } if sg.releasetime != 0 { sg.releasetime = cputicks() } + gp := sg.g + gp.param = nil + if raceenabled { + raceacquireg(gp, unsafe.Pointer(c)) + } goready(gp, 3) } - // release all writers + // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } - gp := sg.g sg.elem = nil - gp.param = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } + gp := sg.g + gp.param = nil + if raceenabled { + raceacquireg(gp, unsafe.Pointer(c)) + } goready(gp, 3) } unlock(&c.lock) @@ -363,8 +358,10 @@ func chanrecv2(t *chantype, c *hchan, elem unsafe.Pointer) (received bool) { // If block == false and no elements are available, returns (false, false). // Otherwise, if c is closed, zeros *ep and returns (true, false). // Otherwise, fills in *ep with an element and returns (true, true). +// A non-nil ep must point to the heap or the caller's stack. func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { - // raceenabled: don't need to check ep, as it is always on the stack. + // raceenabled: don't need to check ep, as it is always on the stack + // or is new memory allocated by reflect. if debugChan { print("chanrecv: chan=", c, "\n") @@ -402,167 +399,139 @@ func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, r } lock(&c.lock) - if c.dataqsiz == 0 { // synchronous channel - if c.closed != 0 { - return recvclosed(c, ep) + + if c.closed != 0 && c.qcount == 0 { + if raceenabled { + raceacquire(unsafe.Pointer(c)) } - - sg := c.sendq.dequeue() - if sg != nil { - if raceenabled { - racesync(c, sg) - } - unlock(&c.lock) - - if ep != nil { - typedmemmove(c.elemtype, ep, sg.elem) - } - sg.elem = nil - gp := sg.g - gp.param = unsafe.Pointer(sg) - if sg.releasetime != 0 { - sg.releasetime = cputicks() - } - goready(gp, 3) - selected = true - received = true - return - } - - if !block { - unlock(&c.lock) - return - } - - // no sender available: block on this channel. - gp := getg() - mysg := acquireSudog() - mysg.releasetime = 0 - if t0 != 0 { - mysg.releasetime = -1 - } - mysg.elem = ep - mysg.waitlink = nil - gp.waiting = mysg - mysg.g = gp - mysg.selectdone = nil - gp.param = nil - c.recvq.enqueue(mysg) - goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3) - - // someone woke us up - if mysg != gp.waiting { - throw("G waiting list is corrupted!") - } - gp.waiting = nil - if mysg.releasetime > 0 { - blockevent(mysg.releasetime-t0, 2) - } - haveData := gp.param != nil - gp.param = nil - releaseSudog(mysg) - - if haveData { - // a sender sent us some data. It already wrote to ep. - selected = true - received = true - return - } - - lock(&c.lock) - if c.closed == 0 { - throw("chanrecv: spurious wakeup") - } - return recvclosed(c, ep) - } - - // asynchronous channel - // wait for some data to appear - var t1 int64 - for futile := byte(0); c.qcount <= 0; futile = traceFutileWakeup { - if c.closed != 0 { - selected, received = recvclosed(c, ep) - if t1 > 0 { - blockevent(t1-t0, 2) - } - return - } - - if !block { - unlock(&c.lock) - return - } - - // wait for someone to send an element - gp := getg() - mysg := acquireSudog() - mysg.releasetime = 0 - if t0 != 0 { - mysg.releasetime = -1 - } - mysg.elem = nil - mysg.g = gp - mysg.selectdone = nil - - c.recvq.enqueue(mysg) - goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv|futile, 3) - - // someone woke us up - try again - if mysg.releasetime > 0 { - t1 = mysg.releasetime - } - releaseSudog(mysg) - lock(&c.lock) - } - - if raceenabled { - raceacquire(chanbuf(c, c.recvx)) - racerelease(chanbuf(c, c.recvx)) - } - if ep != nil { - typedmemmove(c.elemtype, ep, chanbuf(c, c.recvx)) - } - memclr(chanbuf(c, c.recvx), uintptr(c.elemsize)) - - c.recvx++ - if c.recvx == c.dataqsiz { - c.recvx = 0 - } - c.qcount-- - - // ping a sender now that there is space - sg := c.sendq.dequeue() - if sg != nil { - gp := sg.g unlock(&c.lock) - if sg.releasetime != 0 { - sg.releasetime = cputicks() + if ep != nil { + memclr(ep, uintptr(c.elemsize)) } - goready(gp, 3) - } else { - unlock(&c.lock) + return true, false } - if t1 > 0 { - blockevent(t1-t0, 2) + if sg := c.sendq.dequeue(); sg != nil { + // Found a waiting sender. If buffer is size 0, receive value + // directly from sender. Otherwise, recieve from head of queue + // and add sender's value to the tail of the queue (both map to + // the same buffer slot because the queue is full). + recv(c, sg, ep, func() { unlock(&c.lock) }) + return true, true } - selected = true - received = true - return + + if c.qcount > 0 { + // Receive directly from queue + qp := chanbuf(c, c.recvx) + if raceenabled { + raceacquire(qp) + racerelease(qp) + } + if ep != nil { + typedmemmove(c.elemtype, ep, qp) + } + memclr(qp, uintptr(c.elemsize)) + c.recvx++ + if c.recvx == c.dataqsiz { + c.recvx = 0 + } + c.qcount-- + unlock(&c.lock) + return true, true + } + + if !block { + unlock(&c.lock) + return false, false + } + + // no sender available: block on this channel. + gp := getg() + mysg := acquireSudog() + mysg.releasetime = 0 + if t0 != 0 { + mysg.releasetime = -1 + } + mysg.elem = ep + mysg.waitlink = nil + gp.waiting = mysg + mysg.g = gp + mysg.selectdone = nil + gp.param = nil + c.recvq.enqueue(mysg) + goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3) + + // someone woke us up + if mysg != gp.waiting { + throw("G waiting list is corrupted") + } + gp.waiting = nil + if mysg.releasetime > 0 { + blockevent(mysg.releasetime-t0, 2) + } + closed := gp.param == nil + gp.param = nil + releaseSudog(mysg) + return true, !closed } -// recvclosed is a helper function for chanrecv. Handles cleanup -// when the receiver encounters a closed channel. -// Caller must hold c.lock, recvclosed will release the lock. -func recvclosed(c *hchan, ep unsafe.Pointer) (selected, recevied bool) { - if raceenabled { - raceacquire(unsafe.Pointer(c)) +// recv processes a receive operation on a full channel c. +// There are 2 parts: +// 1) The value sent by the sender sg is put into the channel +// and the sender is woken up to go on its merry way. +// 2) The value received by the receiver (the current G) is +// written to ep. +// For synchronous channels, both values are the same. +// For asynchronous channels, the receiver gets its data from +// the channel buffer and the sender's data is put in the +// channel buffer. +// Channel c must be full and locked. recv unlocks c with unlockf. +// sg must already be dequeued from c. +// A non-nil ep must point to the heap or the caller's stack. +func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) { + if c.dataqsiz == 0 { + if raceenabled { + racesync(c, sg) + } + unlockf() + if ep != nil { + // copy data from sender + // ep points to our own stack or heap, so nothing + // special (ala sendDirect) needed here. + typedmemmove(c.elemtype, ep, sg.elem) + } + } else { + // Queue is full. Take the item at the + // head of the queue. Make the sender enqueue + // its item at the tail of the queue. Since the + // queue is full, those are both the same slot. + qp := chanbuf(c, c.recvx) + if raceenabled { + raceacquire(qp) + racerelease(qp) + raceacquireg(sg.g, qp) + racereleaseg(sg.g, qp) + } + // copy data from queue to receiver + if ep != nil { + typedmemmove(c.elemtype, ep, qp) + } + // copy data from sender to queue + typedmemmove(c.elemtype, qp, sg.elem) + c.recvx++ + if c.recvx == c.dataqsiz { + c.recvx = 0 + } + c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz + unlockf() } - unlock(&c.lock) - if ep != nil { - memclr(ep, uintptr(c.elemsize)) + sg.elem = nil + gp := sg.g + gp.param = unsafe.Pointer(sg) + if sg.releasetime != 0 { + sg.releasetime = cputicks() } - return true, false + goready(gp, 4) } // compiler implements diff --git a/src/runtime/select.go b/src/runtime/select.go index 8b6c3ed4c0..508a19b630 100644 --- a/src/runtime/select.go +++ b/src/runtime/select.go @@ -304,7 +304,7 @@ func selectgoImpl(sel *hselect) (uintptr, uint16) { k *scase sglist *sudog sgnext *sudog - futile byte + qp unsafe.Pointer ) loop: @@ -317,15 +317,12 @@ loop: switch cas.kind { case caseRecv: - if c.dataqsiz > 0 { - if c.qcount > 0 { - goto asyncrecv - } - } else { - sg = c.sendq.dequeue() - if sg != nil { - goto syncrecv - } + sg = c.sendq.dequeue() + if sg != nil { + goto recv + } + if c.qcount > 0 { + goto bufrecv } if c.closed != 0 { goto rclose @@ -338,15 +335,12 @@ loop: if c.closed != 0 { goto sclose } - if c.dataqsiz > 0 { - if c.qcount < c.dataqsiz { - goto asyncsend - } - } else { - sg = c.recvq.dequeue() - if sg != nil { - goto syncsend - } + sg = c.recvq.dequeue() + if sg != nil { + goto send + } + if c.qcount < c.dataqsiz { + goto bufsend } case caseDefault: @@ -363,6 +357,9 @@ loop: // pass 2 - enqueue on all chans gp = getg() done = 0 + if gp.waiting != nil { + throw("gp.waiting != nil") + } for i := 0; i < int(sel.ncase); i++ { cas = &scases[pollorder[i]] c = cas.c @@ -389,7 +386,7 @@ loop: // wait for someone to wake us up gp.param = nil - gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect|futile, 2) + gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect, 2) // someone woke us up sellock(sel) @@ -432,16 +429,13 @@ loop: } if cas == nil { - futile = traceFutileWakeup + // This can happen if we were woken up by a close(). + // TODO: figure that out explicitly so we don't need this loop. goto loop } c = cas.c - if c.dataqsiz > 0 { - throw("selectgo: shouldn't happen") - } - if debugSelect { print("wait-return: sel=", sel, " c=", c, " cas=", cas, " kind=", cas.kind, "\n") } @@ -470,7 +464,7 @@ loop: selunlock(sel) goto retc -asyncrecv: +bufrecv: // can receive from buffer if raceenabled { if cas.elem != nil { @@ -485,29 +479,20 @@ asyncrecv: if cas.receivedp != nil { *cas.receivedp = true } + qp = chanbuf(c, c.recvx) if cas.elem != nil { - typedmemmove(c.elemtype, cas.elem, chanbuf(c, c.recvx)) + typedmemmove(c.elemtype, cas.elem, qp) } - memclr(chanbuf(c, c.recvx), uintptr(c.elemsize)) + memclr(qp, uintptr(c.elemsize)) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- - sg = c.sendq.dequeue() - if sg != nil { - gp = sg.g - selunlock(sel) - if sg.releasetime != 0 { - sg.releasetime = cputicks() - } - goready(gp, 3) - } else { - selunlock(sel) - } + selunlock(sel) goto retc -asyncsend: +bufsend: // can send to buffer if raceenabled { raceacquire(chanbuf(c, c.sendx)) @@ -523,47 +508,18 @@ asyncsend: c.sendx = 0 } c.qcount++ - sg = c.recvq.dequeue() - if sg != nil { - gp = sg.g - selunlock(sel) - if sg.releasetime != 0 { - sg.releasetime = cputicks() - } - goready(gp, 3) - } else { - selunlock(sel) - } + selunlock(sel) goto retc -syncrecv: +recv: // can receive from sleeping sender (sg) - if raceenabled { - if cas.elem != nil { - raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc) - } - racesync(c, sg) - } - if msanenabled && cas.elem != nil { - msanwrite(cas.elem, c.elemtype.size) - } - selunlock(sel) + recv(c, sg, cas.elem, func() { selunlock(sel) }) if debugSelect { print("syncrecv: sel=", sel, " c=", c, "\n") } if cas.receivedp != nil { *cas.receivedp = true } - if cas.elem != nil { - typedmemmove(c.elemtype, cas.elem, sg.elem) - } - sg.elem = nil - gp = sg.g - gp.param = unsafe.Pointer(sg) - if sg.releasetime != 0 { - sg.releasetime = cputicks() - } - goready(gp, 3) goto retc rclose: @@ -580,29 +536,19 @@ rclose: } goto retc -syncsend: - // can send to sleeping receiver (sg) +send: + // can send to a sleeping receiver (sg) if raceenabled { raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc) - racesync(c, sg) } if msanenabled { msanread(cas.elem, c.elemtype.size) } - selunlock(sel) + send(c, sg, cas.elem, func() { selunlock(sel) }) if debugSelect { print("syncsend: sel=", sel, " c=", c, "\n") } - if sg.elem != nil { - syncsend(c, sg, cas.elem) - } - sg.elem = nil - gp = sg.g - gp.param = unsafe.Pointer(sg) - if sg.releasetime != 0 { - sg.releasetime = cputicks() - } - goready(gp, 3) + goto retc retc: if cas.releasetime > 0 {