mirror of
https://github.com/golang/go
synced 2024-11-19 09:34:52 -07:00
runtime: make shrinkstack concurrent-safe
Currently shinkstack is only safe during STW because it adjusts channel-related stack pointers and moves send/receive stack slots without synchronizing with the channel code. Make it safe to use when the world isn't stopped by: 1) Locking all channels the G is blocked on while adjusting the sudogs and copying the area of the stack that may contain send/receive slots. 2) For any stack frames that may contain send/receive slot, using an atomic CAS to adjust pointers to prevent races between adjusting a pointer in a receive slot and a concurrent send writing to that receive slot. In principle, the synchronization could be finer-grained. For example, we considered synchronizing around the sudogs, which would allow channel operations involving other Gs to continue if the G being shrunk was far enough down the send/receive queue. However, using the channel lock means no additional locks are necessary in the channel code. Furthermore, the stack shrinking code holds the channel lock for a very short time (much less than the time required to shrink the stack). This does not yet make stack shrinking concurrent; it merely makes doing so safe. This has negligible effect on the go1 and garbage benchmarks. For #12967. Change-Id: Ia49df3a8a7be4b36e365aac4155a2416b94b988c Reviewed-on: https://go-review.googlesource.com/20042 Reviewed-by: Keith Randall <khr@golang.org> Run-TryBot: Austin Clements <austin@google.com>
This commit is contained in:
parent
d45bf7228f
commit
276b177771
@ -586,6 +586,67 @@ func TestSelectDuplicateChannel(t *testing.T) {
|
||||
c <- 8 // wake up B. This operation used to fail because c.recvq was corrupted (it tries to wake up an already running G instead of B)
|
||||
}
|
||||
|
||||
var selectSink interface{}
|
||||
|
||||
func TestSelectStackAdjust(t *testing.T) {
|
||||
// Test that channel receive slots that contain local stack
|
||||
// pointers are adjusted correctly by stack shrinking.
|
||||
c := make(chan *int)
|
||||
d := make(chan *int)
|
||||
ready := make(chan bool)
|
||||
go func() {
|
||||
// Temporarily grow the stack to 10K.
|
||||
stackGrowthRecursive((10 << 10) / (128 * 8))
|
||||
|
||||
// We're ready to trigger GC and stack shrink.
|
||||
ready <- true
|
||||
|
||||
val := 42
|
||||
var cx *int
|
||||
cx = &val
|
||||
// Receive from d. cx won't be affected.
|
||||
select {
|
||||
case cx = <-c:
|
||||
case <-d:
|
||||
}
|
||||
|
||||
// Check that pointer in cx was adjusted correctly.
|
||||
if cx != &val {
|
||||
t.Error("cx no longer points to val")
|
||||
} else if val != 42 {
|
||||
t.Error("val changed")
|
||||
} else {
|
||||
*cx = 43
|
||||
if val != 43 {
|
||||
t.Error("changing *cx failed to change val")
|
||||
}
|
||||
}
|
||||
ready <- true
|
||||
}()
|
||||
|
||||
// Let the goroutine get into the select.
|
||||
<-ready
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Force concurrent GC a few times.
|
||||
var before, after runtime.MemStats
|
||||
runtime.ReadMemStats(&before)
|
||||
for i := 0; i < 100; i++ {
|
||||
selectSink = new([1 << 20]byte)
|
||||
runtime.ReadMemStats(&after)
|
||||
if after.NumGC-before.NumGC >= 2 {
|
||||
goto done
|
||||
}
|
||||
}
|
||||
t.Fatal("failed to trigger concurrent GC")
|
||||
done:
|
||||
selectSink = nil
|
||||
|
||||
// Wake select.
|
||||
d <- nil
|
||||
<-ready
|
||||
}
|
||||
|
||||
func BenchmarkChanNonblocking(b *testing.B) {
|
||||
myc := make(chan int)
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
|
@ -215,7 +215,8 @@ type gobuf struct {
|
||||
// selecttype.
|
||||
type sudog struct {
|
||||
// The following fields are protected by the hchan.lock of the
|
||||
// channel this sudog is blocking on.
|
||||
// channel this sudog is blocking on. shrinkstack depends on
|
||||
// this.
|
||||
|
||||
g *g
|
||||
selectdone *uint32 // CAS to 1 to win select race (may point to stack)
|
||||
|
@ -516,6 +516,9 @@ type adjustinfo struct {
|
||||
old stack
|
||||
delta uintptr // ptr distance from old to new stack (newbase - oldbase)
|
||||
cache pcvalueCache
|
||||
|
||||
// sghi is the highest sudog.elem on the stack.
|
||||
sghi uintptr
|
||||
}
|
||||
|
||||
// Adjustpointer checks whether *vpp is in the old stack described by adjinfo.
|
||||
@ -564,12 +567,19 @@ func adjustpointers(scanp unsafe.Pointer, cbv *bitvector, adjinfo *adjustinfo, f
|
||||
maxp := adjinfo.old.hi
|
||||
delta := adjinfo.delta
|
||||
num := bv.n
|
||||
// If this frame might contain channel receive slots, use CAS
|
||||
// to adjust pointers. If the slot hasn't been received into
|
||||
// yet, it may contain stack pointers and a concurrent send
|
||||
// could race with adjusting those pointers. (The sent value
|
||||
// itself can never contain stack pointers.)
|
||||
useCAS := uintptr(scanp) < adjinfo.sghi
|
||||
for i := uintptr(0); i < num; i++ {
|
||||
if stackDebug >= 4 {
|
||||
print(" ", add(scanp, i*sys.PtrSize), ":", ptrnames[ptrbit(&bv, i)], ":", hex(*(*uintptr)(add(scanp, i*sys.PtrSize))), " # ", i, " ", bv.bytedata[i/8], "\n")
|
||||
}
|
||||
if ptrbit(&bv, i) == 1 {
|
||||
pp := (*uintptr)(add(scanp, i*sys.PtrSize))
|
||||
retry:
|
||||
p := *pp
|
||||
if f != nil && 0 < p && p < _PageSize && debug.invalidptr != 0 || p == poisonStack {
|
||||
// Looks like a junk value in a pointer slot.
|
||||
@ -582,7 +592,14 @@ func adjustpointers(scanp unsafe.Pointer, cbv *bitvector, adjinfo *adjustinfo, f
|
||||
if stackDebug >= 3 {
|
||||
print("adjust ptr ", p, " ", funcname(f), "\n")
|
||||
}
|
||||
*pp = p + delta
|
||||
if useCAS {
|
||||
ppu := (*unsafe.Pointer)(unsafe.Pointer(pp))
|
||||
if !atomic.Casp1(ppu, unsafe.Pointer(p), unsafe.Pointer(p+delta)) {
|
||||
goto retry
|
||||
}
|
||||
} else {
|
||||
*pp = p + delta
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -727,9 +744,68 @@ func fillstack(stk stack, b byte) {
|
||||
}
|
||||
}
|
||||
|
||||
func findsghi(gp *g, stk stack) uintptr {
|
||||
var sghi uintptr
|
||||
for sg := gp.waiting; sg != nil; sg = sg.waitlink {
|
||||
p := uintptr(sg.elem) + uintptr(sg.c.elemsize)
|
||||
if stk.lo <= p && p < stk.hi && p > sghi {
|
||||
sghi = p
|
||||
}
|
||||
p = uintptr(unsafe.Pointer(sg.selectdone)) + unsafe.Sizeof(sg.selectdone)
|
||||
if stk.lo <= p && p < stk.hi && p > sghi {
|
||||
sghi = p
|
||||
}
|
||||
}
|
||||
return sghi
|
||||
}
|
||||
|
||||
// syncadjustsudogs adjusts gp's sudogs and copies the part of gp's
|
||||
// stack they refer to while synchronizing with concurrent channel
|
||||
// operations. It returns the number of bytes of stack copied.
|
||||
func syncadjustsudogs(gp *g, used uintptr, adjinfo *adjustinfo) uintptr {
|
||||
if gp.waiting == nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Lock channels to prevent concurrent send/receive.
|
||||
// It's important that we *only* do this for async
|
||||
// copystack; otherwise, gp may be in the middle of
|
||||
// putting itself on wait queues and this would
|
||||
// self-deadlock.
|
||||
for sg := gp.waiting; sg != nil; sg = sg.waitlink {
|
||||
lock(&sg.c.lock)
|
||||
}
|
||||
|
||||
// Adjust sudogs.
|
||||
adjustsudogs(gp, adjinfo)
|
||||
|
||||
// Copy the part of the stack the sudogs point in to
|
||||
// while holding the lock to prevent races on
|
||||
// send/receive slots.
|
||||
var sgsize uintptr
|
||||
if adjinfo.sghi != 0 {
|
||||
oldBot := adjinfo.old.hi - used
|
||||
newBot := oldBot + adjinfo.delta
|
||||
sgsize = adjinfo.sghi - oldBot
|
||||
memmove(unsafe.Pointer(newBot), unsafe.Pointer(oldBot), sgsize)
|
||||
}
|
||||
|
||||
// Unlock channels.
|
||||
for sg := gp.waiting; sg != nil; sg = sg.waitlink {
|
||||
unlock(&sg.c.lock)
|
||||
}
|
||||
|
||||
return sgsize
|
||||
}
|
||||
|
||||
// Copies gp's stack to a new stack of a different size.
|
||||
// Caller must have changed gp status to Gcopystack.
|
||||
func copystack(gp *g, newsize uintptr) {
|
||||
//
|
||||
// If sync is true, this is a self-triggered stack growth and, in
|
||||
// particular, no other G may be writing to gp's stack (e.g., via a
|
||||
// channel operation). If sync is false, copystack protects against
|
||||
// concurrent channel operations.
|
||||
func copystack(gp *g, newsize uintptr, sync bool) {
|
||||
if gp.syscallsp != 0 {
|
||||
throw("stack growth not allowed in system call")
|
||||
}
|
||||
@ -753,21 +829,41 @@ func copystack(gp *g, newsize uintptr) {
|
||||
adjinfo.old = old
|
||||
adjinfo.delta = new.hi - old.hi
|
||||
|
||||
// copy the stack to the new location
|
||||
memmove(unsafe.Pointer(new.hi-used), unsafe.Pointer(old.hi-used), used)
|
||||
// Adjust sudogs, synchronizing with channel ops if necessary.
|
||||
ncopy := used
|
||||
if sync {
|
||||
adjustsudogs(gp, &adjinfo)
|
||||
} else {
|
||||
// sudogs can point in to the stack. During concurrent
|
||||
// shrinking, these areas may be written to. Find the
|
||||
// highest such pointer so we can handle everything
|
||||
// there and below carefully. (This shouldn't be far
|
||||
// from the bottom of the stack, so there's little
|
||||
// cost in handling everything below it carefully.)
|
||||
adjinfo.sghi = findsghi(gp, old)
|
||||
|
||||
// Synchronize with channel ops and copy the part of
|
||||
// the stack they may interact with.
|
||||
ncopy -= syncadjustsudogs(gp, used, &adjinfo)
|
||||
}
|
||||
|
||||
// Copy the stack (or the rest of it) to the new location
|
||||
memmove(unsafe.Pointer(new.hi-ncopy), unsafe.Pointer(old.hi-ncopy), ncopy)
|
||||
|
||||
// Disallow sigprof scans of this stack and block if there's
|
||||
// one in progress.
|
||||
gcLockStackBarriers(gp)
|
||||
|
||||
// Adjust structures that have pointers into stacks. We have
|
||||
// to do most of these before we traceback the new stack
|
||||
// because gentraceback uses them.
|
||||
// Adjust remaining structures that have pointers into stacks.
|
||||
// We have to do most of these before we traceback the new
|
||||
// stack because gentraceback uses them.
|
||||
adjustctxt(gp, &adjinfo)
|
||||
adjustdefers(gp, &adjinfo)
|
||||
adjustpanics(gp, &adjinfo)
|
||||
adjustsudogs(gp, &adjinfo)
|
||||
adjuststkbar(gp, &adjinfo)
|
||||
if adjinfo.sghi != 0 {
|
||||
adjinfo.sghi += adjinfo.delta
|
||||
}
|
||||
|
||||
// copy old stack barriers to new stack barrier array
|
||||
newstkbar = newstkbar[:len(gp.stkbar)]
|
||||
@ -944,7 +1040,7 @@ func newstack() {
|
||||
|
||||
// The concurrent GC will not scan the stack while we are doing the copy since
|
||||
// the gp is in a Gcopystack status.
|
||||
copystack(gp, uintptr(newsize))
|
||||
copystack(gp, uintptr(newsize), true)
|
||||
if stackDebug >= 1 {
|
||||
print("stack grow done\n")
|
||||
}
|
||||
@ -971,6 +1067,7 @@ func gostartcallfn(gobuf *gobuf, fv *funcval) {
|
||||
|
||||
// Maybe shrink the stack being used by gp.
|
||||
// Called at garbage collection time.
|
||||
// gp must be stopped, but the world need not be.
|
||||
func shrinkstack(gp *g) {
|
||||
if readgstatus(gp) == _Gdead {
|
||||
if gp.stack.lo != 0 {
|
||||
@ -1023,7 +1120,7 @@ func shrinkstack(gp *g) {
|
||||
}
|
||||
|
||||
oldstatus := casgcopystack(gp)
|
||||
copystack(gp, newsize)
|
||||
copystack(gp, newsize, false)
|
||||
casgstatus(gp, _Gcopystack, oldstatus)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user