diff --git a/src/cmd/api/goapi.go b/src/cmd/api/goapi.go index 7117254e534..2900a27cebe 100644 --- a/src/cmd/api/goapi.go +++ b/src/cmd/api/goapi.go @@ -383,6 +383,7 @@ func (w *Walker) parseFile(dir, file string) (*ast.File, error) { " mspan struct{}; m struct{}; lock struct{}; slicetype struct{};" + " iface struct{}; eface struct{}; interfacetype struct{}; itab struct{};" + " mcache struct{}; bucket struct{}; sudog struct{}; g struct{};" + + " hchan struct{}; chantype struct{}; waitq struct{};" + " note struct{};" + ")" f, err = parser.ParseFile(fset, filename, src, 0) diff --git a/src/cmd/gc/select.c b/src/cmd/gc/select.c index cbc199698d6..a8caefbb531 100644 --- a/src/cmd/gc/select.c +++ b/src/cmd/gc/select.c @@ -340,6 +340,7 @@ selecttype(int32 size) sudog->list = list(sudog->list, nod(ODCLFIELD, newname(lookup("elem")), typenod(ptrto(types[TUINT8])))); sudog->list = list(sudog->list, nod(ODCLFIELD, newname(lookup("releasetime")), typenod(types[TUINT64]))); sudog->list = list(sudog->list, nod(ODCLFIELD, newname(lookup("nrelease")), typenod(types[TINT32]))); + sudog->list = list(sudog->list, nod(ODCLFIELD, newname(lookup("waitlink")), typenod(ptrto(types[TUINT8])))); typecheck(&sudog, Etype); sudog->type->noalg = 1; sudog->type->local = 1; diff --git a/src/pkg/reflect/asm_386.s b/src/pkg/reflect/asm_386.s index 8c84bba43bd..933908f206e 100644 --- a/src/pkg/reflect/asm_386.s +++ b/src/pkg/reflect/asm_386.s @@ -52,3 +52,5 @@ TEXT ·unsafe_New(SB),NOSPLIT,$0-0 JMP runtime·newobject(SB) TEXT ·unsafe_NewArray(SB),NOSPLIT,$0-0 JMP runtime·newarray(SB) +TEXT ·makechan(SB),NOSPLIT,$0-0 + JMP runtime·makechan(SB) diff --git a/src/pkg/reflect/asm_amd64.s b/src/pkg/reflect/asm_amd64.s index 195928bff9f..85de6ea8270 100644 --- a/src/pkg/reflect/asm_amd64.s +++ b/src/pkg/reflect/asm_amd64.s @@ -52,3 +52,5 @@ TEXT ·unsafe_New(SB),NOSPLIT,$0-0 JMP runtime·newobject(SB) TEXT ·unsafe_NewArray(SB),NOSPLIT,$0-0 JMP runtime·newarray(SB) +TEXT ·makechan(SB),NOSPLIT,$0-0 + JMP runtime·makechan(SB) diff --git a/src/pkg/reflect/asm_amd64p32.s b/src/pkg/reflect/asm_amd64p32.s index 8c84bba43bd..933908f206e 100644 --- a/src/pkg/reflect/asm_amd64p32.s +++ b/src/pkg/reflect/asm_amd64p32.s @@ -52,3 +52,5 @@ TEXT ·unsafe_New(SB),NOSPLIT,$0-0 JMP runtime·newobject(SB) TEXT ·unsafe_NewArray(SB),NOSPLIT,$0-0 JMP runtime·newarray(SB) +TEXT ·makechan(SB),NOSPLIT,$0-0 + JMP runtime·makechan(SB) diff --git a/src/pkg/reflect/asm_arm.s b/src/pkg/reflect/asm_arm.s index fafc6f313ec..e621b169b1b 100644 --- a/src/pkg/reflect/asm_arm.s +++ b/src/pkg/reflect/asm_arm.s @@ -52,3 +52,5 @@ TEXT ·unsafe_New(SB),NOSPLIT,$0-0 B runtime·newobject(SB) TEXT ·unsafe_NewArray(SB),NOSPLIT,$0-0 B runtime·newarray(SB) +TEXT ·makechan(SB),NOSPLIT,$0-0 + B runtime·makechan(SB) diff --git a/src/pkg/runtime/chan.go b/src/pkg/runtime/chan.go new file mode 100644 index 00000000000..67427e960e6 --- /dev/null +++ b/src/pkg/runtime/chan.go @@ -0,0 +1,278 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package runtime + +// This file contains the implementation of Go channels +// and select statements. + +import "unsafe" + +const ( + maxAlign = 8 + hchanSize = unsafe.Sizeof(hchan{}) + debugChan = false +) + +// TODO: make hchan.buf an unsafe.Pointer, not a *uint8 + +func makechan(t *chantype, size int64) *hchan { + elem := t.elem + + // compiler checks this but be safe. + if elem.size >= 1<<16 { + gothrow("makechan: invalid channel element type") + } + if hchanSize%maxAlign != 0 || elem.align > maxAlign { + gothrow("makechan: bad alignment") + } + if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (maxMem-hchanSize)/uintptr(elem.size)) { + panic("makechan: size out of range") + } + + var c *hchan + if elem.kind&kindNoPointers != 0 || size == 0 { + // allocate memory in one call + c = (*hchan)(gomallocgc(hchanSize+uintptr(size)*uintptr(elem.size), nil, flagNoScan)) + if size > 0 && elem.size != 0 { + c.buf = (*uint8)(add(unsafe.Pointer(c), hchanSize)) + } else { + c.buf = (*uint8)(unsafe.Pointer(c)) // race detector uses this location for synchronization + } + } else { + c = new(hchan) + c.buf = (*uint8)(newarray(elem, uintptr(size))) + } + c.elemsize = uint16(elem.size) + c.elemtype = elem + c.dataqsiz = uint(size) + + if debugChan { + println("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size) + } + return c +} + +// chanbuf(c, i) is pointer to the i'th slot in the buffer. +func chanbuf(c *hchan, i uint) unsafe.Pointer { + return add(unsafe.Pointer(c.buf), uintptr(i)*uintptr(c.elemsize)) +} + +// entry point for c <- x from compiled code +//go:nosplit +func chansend1(t *chantype, c *hchan, elem unsafe.Pointer) { + chansend(t, c, elem, true, gogetcallerpc(unsafe.Pointer(&t))) +} + +/* + * generic single channel send/recv + * If block is not nil, + * then the protocol will not + * sleep but return if it could + * not complete. + * + * sleep can wake up with g.param == nil + * when a channel involved in the sleep has + * been closed. it is easiest to loop and re-run + * the operation; we'll see that it's now closed. + */ +func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { + if raceenabled { + fn := chansend + pc := **(**uintptr)(unsafe.Pointer(&fn)) + raceReadObjectPC(t.elem, ep, callerpc, pc) + } + + if c == nil { + if !block { + return false + } + gopark(nil, nil, "chan send (nil chan)") + return false // not reached + } + + if debugChan { + println("chansend: chan=", c) + } + + var t0 int64 + if blockprofilerate > 0 { + t0 = gocputicks() + } + + golock(&c.lock) + if raceenabled { + fn := chansend + pc := **(**uintptr)(unsafe.Pointer(&fn)) + racereadpc(unsafe.Pointer(c), pc, callerpc) + } + if c.closed != 0 { + gounlock(&c.lock) + panic("send on closed channel") + } + + if c.dataqsiz == 0 { // synchronous channel + sg := c.recvq.dequeue() + if sg != nil { // found a waiting receiver + if raceenabled { + racesync(c, sg) + } + gounlock(&c.lock) + + recvg := sg.g + recvg.param = unsafe.Pointer(sg) + if sg.elem != nil { + memmove(unsafe.Pointer(sg.elem), ep, uintptr(c.elemsize)) + } + if sg.releasetime != 0 { + // Yes, this is ugly. On 64-bit sg.releasetime has type + // int. On 32-bit it has type int64. There's no easy way + // to assign to both types in Go. At some point we'll + // write the Go types directly instead of generating them + // via the C types. At that point, this nastiness goes away. + *(*int64)(unsafe.Pointer(&sg.releasetime)) = gocputicks() + } + goready(recvg) + return true + } + + if !block { + gounlock(&c.lock) + return false + } + + // no receiver available: block on this channel. + gp := getg() + mysg := acquireSudog() + if t0 != 0 { + mysg.releasetime = -1 + } + mysg.elem = (*uint8)(ep) + mysg.waitlink = nil + gp.waiting = mysg + mysg.g = gp + mysg.selectdone = nil + gp.param = nil + c.sendq.enqueue(mysg) + goparkunlock(&c.lock, "chan send") + + // someone woke us up. + if gp.param == nil { + if c.closed == 0 { + gothrow("chansend: spurious wakeup") + } + panic("send on closed channel") + } + if mysg.releasetime > 0 { + goblockevent(int64(mysg.releasetime)-t0, 3) + } + if mysg != gp.waiting { + gothrow("G waiting list is corrupted!") + } + gp.waiting = nil + releaseSudog(mysg) + return true + } + + // asynchronous channel + // wait for some space to write our data + var t1 int64 + for c.qcount >= c.dataqsiz { + if !block { + gounlock(&c.lock) + return false + } + gp := getg() + mysg := acquireSudog() + if t0 != 0 { + mysg.releasetime = -1 + } + mysg.g = gp + mysg.elem = nil + mysg.selectdone = nil + c.sendq.enqueue(mysg) + goparkunlock(&c.lock, "chan send") + + // someone woke us up - try again + if mysg.releasetime != 0 { + t1 = int64(mysg.releasetime) + } + releaseSudog(mysg) + golock(&c.lock) + if c.closed != 0 { + gounlock(&c.lock) + panic("send on closed channel") + } + } + + // write our data into the channel buffer + if raceenabled { + raceacquire(chanbuf(c, c.sendx)) + racerelease(chanbuf(c, c.sendx)) + } + memmove(chanbuf(c, c.sendx), ep, uintptr(c.elemsize)) + c.sendx++ + if c.sendx == c.dataqsiz { + c.sendx = 0 + } + c.qcount++ + + // wake up a waiting receiver + sg := c.recvq.dequeue() + if sg != nil { + recvg := sg.g + gounlock(&c.lock) + if sg.releasetime != 0 { + *(*int64)(unsafe.Pointer(&sg.releasetime)) = gocputicks() + } + goready(recvg) + } else { + gounlock(&c.lock) + } + if t1 > 0 { + goblockevent(t1-t0, 3) + } + return true +} + +func (q *waitq) enqueue(sgp *sudog) { + sgp.link = nil + if q.first == nil { + q.first = sgp + q.last = sgp + return + } + q.last.link = sgp + q.last = sgp +} + +func (q *waitq) dequeue() *sudog { + for { + sgp := q.first + if sgp == nil { + return nil + } + q.first = sgp.link + if q.last == sgp { + q.last = nil + } + + // if sgp participates in a select and is already signaled, ignore it + if sgp.selectdone != nil { + // claim the right to signal + if *sgp.selectdone != 0 || !gocas(sgp.selectdone, 0, 1) { + continue + } + } + + return sgp + } +} + +func racesync(c *hchan, sg *sudog) { + racerelease(chanbuf(c, 0)) + raceacquireg(sg.g, chanbuf(c, 0)) + racereleaseg(sg.g, chanbuf(c, 0)) + raceacquire(chanbuf(c, 0)) +} diff --git a/src/pkg/runtime/chan.goc b/src/pkg/runtime/chan.goc index 2ef1c8566e5..7f6373dc818 100644 --- a/src/pkg/runtime/chan.goc +++ b/src/pkg/runtime/chan.goc @@ -18,78 +18,6 @@ static SudoG* dequeue(WaitQ*); static void enqueue(WaitQ*, SudoG*); static void racesync(Hchan*, SudoG*); -static Type hchanType; -static String hchanStr; - -void -runtime·chaninit(void) -{ - int32 i, off; - byte *mask; - - // Generate (bare minimum) type descriptor for Hchan. - hchanType.size = sizeof(Hchan); - hchanStr = runtime·gostringnocopy((byte*)"chan"); - hchanType.string = &hchanStr; - // Hchan has only one interesting pointer -- buf. - off = offsetof(Hchan, buf)/PtrSize*gcBits; - if(off%8) - runtime·throw("makechan: unaligned buffer"); - if(off+8 >= sizeof(hchanType.gc)*8) - runtime·throw("makechan: gc mask does not fit"); - mask = (byte*)hchanType.gc; - for(i = 0; i < off/8; i++) - mask[i] = (BitsScalar<<2) | (BitsScalar<<6); - mask[off/8] = (BitsPointer<<2) | (BitsDead<<6); -} - -static Hchan* -makechan(ChanType *t, int64 hint) -{ - Hchan *c; - Type *elem; - - elem = t->elem; - - // compiler checks this but be safe. - if(elem->size >= (1<<16)) - runtime·throw("makechan: invalid channel element type"); - if((sizeof(*c)%MAXALIGN) != 0 || elem->align > MAXALIGN) - runtime·throw("makechan: bad alignment"); - - if(hint < 0 || (intgo)hint != hint || (elem->size > 0 && hint > (MaxMem - sizeof(*c)) / elem->size)) - runtime·panicstring("makechan: size out of range"); - - if((elem->kind&KindNoPointers) || hint == 0) { - // allocate memory in one call - c = (Hchan*)runtime·mallocgc(sizeof(*c) + hint*elem->size, nil, FlagNoScan); - if(hint > 0 && elem->size != 0) - c->buf = (byte*)(c+1); - else - c->buf = (byte*)c; // race detector uses this location for synchronization - } else { - c = (Hchan*)runtime·cnew(&hchanType); - c->buf = runtime·cnewarray(elem, hint); - } - c->elemsize = elem->size; - c->elemtype = elem; - c->dataqsiz = hint; - - if(debug) - runtime·printf("makechan: chan=%p; elemsize=%D; elemalg=%p; dataqsiz=%D\n", - c, (int64)elem->size, elem->alg, (int64)c->dataqsiz); - - return c; -} - -func reflect·makechan(t *ChanType, size uint64) (c *Hchan) { - c = makechan(t, size); -} - -func makechan(t *ChanType, size int64) (c *Hchan) { - c = makechan(t, size); -} - /* * generic single channel send/recv * if the bool pointer is nil, @@ -375,11 +303,6 @@ closed: return true; } -#pragma textflag NOSPLIT -func chansend1(t *ChanType, c *Hchan, elem *byte) { - chansend(t, c, elem, true, runtime·getcallerpc(&t)); -} - #pragma textflag NOSPLIT func chanrecv1(t *ChanType, c *Hchan, elem *byte) { chanrecv(t, c, elem, true, nil); diff --git a/src/pkg/runtime/chan.h b/src/pkg/runtime/chan.h index 5ac39cab88e..5ebbcfd4da1 100644 --- a/src/pkg/runtime/chan.h +++ b/src/pkg/runtime/chan.h @@ -5,22 +5,9 @@ #define MAXALIGN 8 typedef struct WaitQ WaitQ; -typedef struct SudoG SudoG; typedef struct Select Select; typedef struct Scase Scase; -// Known to compiler. -// Changes here must also be made in src/cmd/gc/select.c's selecttype. -struct SudoG -{ - G* g; - uint32* selectdone; - SudoG* link; - byte* elem; // data element - int64 releasetime; - int32 nrelease; // -1 for acquire -}; - struct WaitQ { SudoG* first; diff --git a/src/pkg/runtime/chan_test.go b/src/pkg/runtime/chan_test.go index 9ffdc07dc72..bb0f28655db 100644 --- a/src/pkg/runtime/chan_test.go +++ b/src/pkg/runtime/chan_test.go @@ -430,6 +430,38 @@ func TestMultiConsumer(t *testing.T) { } } +func TestShrinkStackDuringBlockedSend(t *testing.T) { + // make sure that channel operations still work when we are + // blocked on a channel send and we shrink the stack. + // NOTE: this test probably won't fail unless stack.c:StackDebug + // is set to >= 1. + const n = 10 + c := make(chan int) + done := make(chan struct{}) + + go func() { + for i := 0; i < n; i++ { + c <- i + // use lots of stack, briefly. + stackGrowthRecursive(20) + } + done <- struct{}{} + }() + + for i := 0; i < n; i++ { + x := <-c + if x != i { + t.Errorf("bad channel read: want %d, got %d", i, x) + } + // Waste some time so sender can finish using lots of stack + // and block in channel send. + time.Sleep(1 * time.Millisecond) + // trigger GC which will shrink the stack of the sender. + runtime.GC() + } + <-done +} + func BenchmarkChanNonblocking(b *testing.B) { myc := make(chan int) b.RunParallel(func(pb *testing.PB) { diff --git a/src/pkg/runtime/mprof.goc b/src/pkg/runtime/mprof.goc index 3d8d790cdd7..57596b2231d 100644 --- a/src/pkg/runtime/mprof.goc +++ b/src/pkg/runtime/mprof.goc @@ -218,7 +218,10 @@ runtime·blockevent(int64 cycles, int32 skip) if(rate <= 0 || (rate > cycles && runtime·fastrand1()%rate > cycles)) return; - nstk = runtime·callers(skip, stk, nelem(stk)); + if(g->m->curg == nil || g->m->curg == g) + nstk = runtime·callers(skip, stk, nelem(stk)); + else + nstk = runtime·gcallers(g->m->curg, skip, stk, nelem(stk)); runtime·lock(&runtime·proflock); b = stkbucket(BProf, 0, stk, nstk, true); b->data.bp.count++; @@ -226,6 +229,12 @@ runtime·blockevent(int64 cycles, int32 skip) runtime·unlock(&runtime·proflock); } +void +runtime·blockevent_m(void) +{ + runtime·blockevent(g->m->scalararg[0] + ((int64)g->m->scalararg[1]<<32), g->m->scalararg[2]); +} + void runtime·iterate_memprof(void (*callback)(Bucket*, uintptr, uintptr*, uintptr, uintptr, uintptr)) { diff --git a/src/pkg/runtime/pprof/pprof_test.go b/src/pkg/runtime/pprof/pprof_test.go index aba538e7554..9ab211c2dab 100644 --- a/src/pkg/runtime/pprof/pprof_test.go +++ b/src/pkg/runtime/pprof/pprof_test.go @@ -287,7 +287,7 @@ func TestBlockProfile(t *testing.T) { `}, {"chan send", blockChanSend, ` [0-9]+ [0-9]+ @ 0x[0-9,a-f]+ 0x[0-9,a-f]+ 0x[0-9,a-f]+ 0x[0-9,a-f]+ 0x[0-9,a-f]+ -# 0x[0-9,a-f]+ runtime\.chansend1\+0x[0-9,a-f]+ .*/src/pkg/runtime/chan.goc:[0-9]+ +# 0x[0-9,a-f]+ runtime\.chansend1\+0x[0-9,a-f]+ .*/src/pkg/runtime/chan.go:[0-9]+ # 0x[0-9,a-f]+ runtime/pprof_test\.blockChanSend\+0x[0-9,a-f]+ .*/src/pkg/runtime/pprof/pprof_test.go:[0-9]+ # 0x[0-9,a-f]+ runtime/pprof_test\.TestBlockProfile\+0x[0-9,a-f]+ .*/src/pkg/runtime/pprof/pprof_test.go:[0-9]+ `}, diff --git a/src/pkg/runtime/proc.c b/src/pkg/runtime/proc.c index 6767622846c..722f44bb1b0 100644 --- a/src/pkg/runtime/proc.c +++ b/src/pkg/runtime/proc.c @@ -158,7 +158,6 @@ runtime·schedinit(void) runtime·symtabinit(); runtime·stackinit(); runtime·mallocinit(); - runtime·chaninit(); mcommoninit(g->m); // Initialize the itable value for newErrorCString, diff --git a/src/pkg/runtime/runtime.h b/src/pkg/runtime/runtime.h index dcce369a5cc..f12e50cbfb1 100644 --- a/src/pkg/runtime/runtime.h +++ b/src/pkg/runtime/runtime.h @@ -63,6 +63,7 @@ typedef uint8 byte; typedef struct Func Func; typedef struct G G; typedef struct Gobuf Gobuf; +typedef struct SudoG SudoG; typedef struct Lock Lock; typedef struct M M; typedef struct P P; @@ -217,6 +218,18 @@ struct Gobuf uintreg ret; uintptr lr; }; +// Known to compiler. +// Changes here must also be made in src/cmd/gc/select.c's selecttype. +struct SudoG +{ + G* g; + uint32* selectdone; + SudoG* link; + byte* elem; // data element + int64 releasetime; + int32 nrelease; // -1 for acquire + SudoG* waitlink; // G.waiting list +}; struct GCStats { // the struct must consist of only uint64's, @@ -285,6 +298,7 @@ struct G uintptr sigpc; uintptr gopc; // pc of go statement that created this goroutine uintptr racectx; + SudoG *waiting; // sudog structures this G is waiting on (that have a valid elem ptr) uintptr end[]; }; @@ -884,7 +898,6 @@ MCache* runtime·allocmcache(void); void runtime·freemcache(MCache*); void runtime·mallocinit(void); void runtime·gcinit(void); -void runtime·chaninit(void); void* runtime·mallocgc(uintptr size, Type* typ, uint32 flag); void runtime·runpanic(Panic*); uintptr runtime·getcallersp(void*); diff --git a/src/pkg/runtime/stack.c b/src/pkg/runtime/stack.c index aeb5fb72115..fc11d98c9b6 100644 --- a/src/pkg/runtime/stack.c +++ b/src/pkg/runtime/stack.c @@ -268,8 +268,10 @@ runtime·stackfree(G *gp, void *v, Stktop *top) n = (uintptr)(top+1) - (uintptr)v; if(n & (n-1)) runtime·throw("stack not a power of 2"); - if(StackDebug >= 1) + if(StackDebug >= 1) { runtime·printf("stackfree %p %d\n", v, (int32)n); + runtime·memclr(v, n); // for testing, clobber stack data + } gp->stacksize -= n; if(runtime·debug.efence || StackFromSystem) { if(runtime·debug.efence || StackFaultOnFree) @@ -753,6 +755,21 @@ adjustdefers(G *gp, AdjustInfo *adjinfo) } } +static void +adjustsudogs(G *gp, AdjustInfo *adjinfo) +{ + SudoG *s; + byte *e; + + // the data elements pointed to by a SudoG structure + // might be in the stack. + for(s = gp->waiting; s != nil; s = s->waitlink) { + e = s->elem; + if(adjinfo->oldstk <= e && e < adjinfo->oldbase) + s->elem = e + adjinfo->delta; + } +} + // Copies the top stack segment of gp to a new stack segment of a // different size. The top segment must contain nframes frames. static void @@ -791,6 +808,7 @@ copystack(G *gp, uintptr nframes, uintptr newsize) // adjust other miscellaneous things that have pointers into stacks. adjustctxt(gp, &adjinfo); adjustdefers(gp, &adjinfo); + adjustsudogs(gp, &adjinfo); // copy the stack (including Stktop) to the new location runtime·memmove(newbase - used, oldbase - used, used); @@ -1069,6 +1087,8 @@ runtime·shrinkstack(G *gp) if(gp->m != nil && gp->m->libcallsp != 0) return; #endif + if(StackDebug > 0) + runtime·printf("shrinking stack %D->%D\n", (uint64)oldsize, (uint64)newsize); nframes = copyabletopsegment(gp); if(nframes == -1) return; diff --git a/src/pkg/runtime/stubs.go b/src/pkg/runtime/stubs.go index a31589ca86c..e7d7c38bf1c 100644 --- a/src/pkg/runtime/stubs.go +++ b/src/pkg/runtime/stubs.go @@ -30,6 +30,18 @@ func racereadrangepc(addr unsafe.Pointer, len int, callpc, pc uintptr) //go:noescape func racewriterangepc(addr unsafe.Pointer, len int, callpc, pc uintptr) +//go:noescape +func raceacquire(addr unsafe.Pointer) + +//go:noescape +func racerelease(addr unsafe.Pointer) + +//go:noescape +func raceacquireg(gp *g, addr unsafe.Pointer) + +//go:noescape +func racereleaseg(gp *g, addr unsafe.Pointer) + // Should be a built-in for unsafe.Pointer? func add(p unsafe.Pointer, x uintptr) unsafe.Pointer { return unsafe.Pointer(uintptr(p) + x)