1
0
mirror of https://github.com/golang/go synced 2024-11-26 07:57:57 -07:00

runtime: avoid multiple records with identical stacks from MutexProfile

When using frame pointer unwinding, we defer frame skipping and inline
expansion for call stacks until profile reporting time. We can end up
with records which have different stacks if no frames are skipped, but
identical stacks once skipping is taken into account. Returning multiple
records with the same stack (but different values) has broken programs
which rely on the records already being fully aggregated by call stack
when returned from runtime.MutexProfile. This CL addresses the problem
by handling skipping at recording time. We do full inline expansion to
correctly skip the desired number of frames when recording the call
stack, and then handle the rest of inline expansion when reporting the
profile.

The regression test in this CL is adapted from the reproducer in
https://github.com/grafana/pyroscope-go/issues/103, authored by Tolya
Korniltsev.

Fixes #67548

This reapplies CL 595966.
The original version of this CL introduced a bounds error in
MutexProfile and failed to correctly expand inlined frames from that
call. This CL applies the original CL, fixing the bounds error and
adding a test for the output of MutexProfile to ensure the frames are
expanded properly.

Change-Id: I5ef8aafb9f88152a704034065c0742ba767c4dbb
Reviewed-on: https://go-review.googlesource.com/c/go/+/598515
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Carlos Amedee <carlos@golang.org>
Reviewed-by: Cherry Mui <cherryyz@google.com>
This commit is contained in:
Nick Ripley 2024-07-16 07:15:22 -04:00 committed by Cherry Mui
parent 8c88f0c736
commit 87abb4afb6
3 changed files with 282 additions and 18 deletions

View File

@ -9,6 +9,7 @@ package runtime
import ( import (
"internal/abi" "internal/abi"
"internal/goarch"
"internal/profilerecord" "internal/profilerecord"
"internal/runtime/atomic" "internal/runtime/atomic"
"runtime/internal/sys" "runtime/internal/sys"
@ -542,16 +543,14 @@ func saveblockevent(cycles, rate int64, skip int, which bucketType) {
gp := getg() gp := getg()
mp := acquirem() // we must not be preempted while accessing profstack mp := acquirem() // we must not be preempted while accessing profstack
nstk := 1 var nstk int
if tracefpunwindoff() || gp.m.hasCgoOnStack() { if tracefpunwindoff() || gp.m.hasCgoOnStack() {
mp.profStack[0] = logicalStackSentinel
if gp.m.curg == nil || gp.m.curg == gp { if gp.m.curg == nil || gp.m.curg == gp {
nstk = callers(skip, mp.profStack[1:]) nstk = callers(skip, mp.profStack)
} else { } else {
nstk = gcallers(gp.m.curg, skip, mp.profStack[1:]) nstk = gcallers(gp.m.curg, skip, mp.profStack)
} }
} else { } else {
mp.profStack[0] = uintptr(skip)
if gp.m.curg == nil || gp.m.curg == gp { if gp.m.curg == nil || gp.m.curg == gp {
if skip > 0 { if skip > 0 {
// We skip one fewer frame than the provided value for frame // We skip one fewer frame than the provided value for frame
@ -559,12 +558,12 @@ func saveblockevent(cycles, rate int64, skip int, which bucketType) {
// frame, whereas the saved frame pointer will give us the // frame, whereas the saved frame pointer will give us the
// caller's return address first (so, not including // caller's return address first (so, not including
// saveblockevent) // saveblockevent)
mp.profStack[0] -= 1 skip -= 1
} }
nstk += fpTracebackPCs(unsafe.Pointer(getfp()), mp.profStack[1:]) nstk = fpTracebackPartialExpand(skip, unsafe.Pointer(getfp()), mp.profStack)
} else { } else {
mp.profStack[1] = gp.m.curg.sched.pc mp.profStack[0] = gp.m.curg.sched.pc
nstk += 1 + fpTracebackPCs(unsafe.Pointer(gp.m.curg.sched.bp), mp.profStack[2:]) nstk = 1 + fpTracebackPartialExpand(skip, unsafe.Pointer(gp.m.curg.sched.bp), mp.profStack[1:])
} }
} }
@ -572,6 +571,52 @@ func saveblockevent(cycles, rate int64, skip int, which bucketType) {
releasem(mp) releasem(mp)
} }
// fpTracebackPartialExpand records a call stack obtained starting from fp.
// This function will skip the given number of frames, properly accounting for
// inlining, and save remaining frames as "physical" return addresses. The
// consumer should later use CallersFrames or similar to expand inline frames.
func fpTracebackPartialExpand(skip int, fp unsafe.Pointer, pcBuf []uintptr) int {
var n int
lastFuncID := abi.FuncIDNormal
skipOrAdd := func(retPC uintptr) bool {
if skip > 0 {
skip--
} else if n < len(pcBuf) {
pcBuf[n] = retPC
n++
}
return n < len(pcBuf)
}
for n < len(pcBuf) && fp != nil {
// return addr sits one word above the frame pointer
pc := *(*uintptr)(unsafe.Pointer(uintptr(fp) + goarch.PtrSize))
if skip > 0 {
callPC := pc - 1
fi := findfunc(callPC)
u, uf := newInlineUnwinder(fi, callPC)
for ; uf.valid(); uf = u.next(uf) {
sf := u.srcFunc(uf)
if sf.funcID == abi.FuncIDWrapper && elideWrapperCalling(lastFuncID) {
// ignore wrappers
} else if more := skipOrAdd(uf.pc + 1); !more {
return n
}
lastFuncID = sf.funcID
}
} else {
// We've skipped the desired number of frames, so no need
// to perform further inline expansion now.
pcBuf[n] = pc
n++
}
// follow the frame pointer to the next one
fp = unsafe.Pointer(*(*uintptr)(fp))
}
return n
}
// lockTimer assists with profiling contention on runtime-internal locks. // lockTimer assists with profiling contention on runtime-internal locks.
// //
// There are several steps between the time that an M experiences contention and // There are several steps between the time that an M experiences contention and
@ -1075,10 +1120,34 @@ type BlockProfileRecord struct {
// the [testing] package's -test.blockprofile flag instead // the [testing] package's -test.blockprofile flag instead
// of calling BlockProfile directly. // of calling BlockProfile directly.
func BlockProfile(p []BlockProfileRecord) (n int, ok bool) { func BlockProfile(p []BlockProfileRecord) (n int, ok bool) {
return blockProfileInternal(len(p), func(r profilerecord.BlockProfileRecord) { var m int
copyBlockProfileRecord(&p[0], r) n, ok = blockProfileInternal(len(p), func(r profilerecord.BlockProfileRecord) {
p = p[1:] copyBlockProfileRecord(&p[m], r)
m++
}) })
if ok {
expandFrames(p[:n])
}
return
}
func expandFrames(p []BlockProfileRecord) {
expandedStack := makeProfStack()
for i := range p {
cf := CallersFrames(p[i].Stack())
j := 0
for ; j < len(expandedStack); j++ {
f, more := cf.Next()
// f.PC is a "call PC", but later consumers will expect
// "return PCs"
expandedStack[j] = f.PC + 1
if !more {
break
}
}
k := copy(p[i].Stack0[:], expandedStack[:j])
clear(p[i].Stack0[k:])
}
} }
// blockProfileInternal returns the number of records n in the profile. If there // blockProfileInternal returns the number of records n in the profile. If there
@ -1111,6 +1180,9 @@ func blockProfileInternal(size int, copyFn func(profilerecord.BlockProfileRecord
return return
} }
// copyBlockProfileRecord copies the sample values and call stack from src to dst.
// The call stack is copied as-is. The caller is responsible for handling inline
// expansion, needed when the call stack was collected with frame pointer unwinding.
func copyBlockProfileRecord(dst *BlockProfileRecord, src profilerecord.BlockProfileRecord) { func copyBlockProfileRecord(dst *BlockProfileRecord, src profilerecord.BlockProfileRecord) {
dst.Count = src.Count dst.Count = src.Count
dst.Cycles = src.Cycles dst.Cycles = src.Cycles
@ -1123,7 +1195,11 @@ func copyBlockProfileRecord(dst *BlockProfileRecord, src profilerecord.BlockProf
if asanenabled { if asanenabled {
asanwrite(unsafe.Pointer(&dst.Stack0[0]), unsafe.Sizeof(dst.Stack0)) asanwrite(unsafe.Pointer(&dst.Stack0[0]), unsafe.Sizeof(dst.Stack0))
} }
i := fpunwindExpand(dst.Stack0[:], src.Stack) // We just copy the stack here without inline expansion
// (needed if frame pointer unwinding is used)
// since this function is called under the profile lock,
// and doing something that might allocate can violate lock ordering.
i := copy(dst.Stack0[:], src.Stack)
clear(dst.Stack0[i:]) clear(dst.Stack0[i:])
} }
@ -1142,10 +1218,15 @@ func pprof_blockProfileInternal(p []profilerecord.BlockProfileRecord) (n int, ok
// Most clients should use the [runtime/pprof] package // Most clients should use the [runtime/pprof] package
// instead of calling MutexProfile directly. // instead of calling MutexProfile directly.
func MutexProfile(p []BlockProfileRecord) (n int, ok bool) { func MutexProfile(p []BlockProfileRecord) (n int, ok bool) {
return mutexProfileInternal(len(p), func(r profilerecord.BlockProfileRecord) { var m int
copyBlockProfileRecord(&p[0], r) n, ok = mutexProfileInternal(len(p), func(r profilerecord.BlockProfileRecord) {
p = p[1:] copyBlockProfileRecord(&p[m], r)
m++
}) })
if ok {
expandFrames(p[:n])
}
return
} }
// mutexProfileInternal returns the number of records n in the profile. If there // mutexProfileInternal returns the number of records n in the profile. If there

View File

@ -404,6 +404,25 @@ type countProfile interface {
Label(i int) *labelMap Label(i int) *labelMap
} }
// expandInlinedFrames copies the call stack from pcs into dst, expanding any
// PCs corresponding to inlined calls into the corresponding PCs for the inlined
// functions. Returns the number of frames copied to dst.
func expandInlinedFrames(dst, pcs []uintptr) int {
cf := runtime.CallersFrames(pcs)
var n int
for n < len(dst) {
f, more := cf.Next()
// f.PC is a "call PC", but later consumers will expect
// "return PCs"
dst[n] = f.PC + 1
n++
if !more {
break
}
}
return n
}
// printCountCycleProfile outputs block profile records (for block or mutex profiles) // printCountCycleProfile outputs block profile records (for block or mutex profiles)
// as the pprof-proto format output. Translations from cycle count to time duration // as the pprof-proto format output. Translations from cycle count to time duration
// are done because The proto expects count and time (nanoseconds) instead of count // are done because The proto expects count and time (nanoseconds) instead of count
@ -426,7 +445,7 @@ func printCountCycleProfile(w io.Writer, countName, cycleName string, records []
values[1] = int64(float64(r.Cycles) / cpuGHz) values[1] = int64(float64(r.Cycles) / cpuGHz)
// For count profiles, all stack addresses are // For count profiles, all stack addresses are
// return PCs, which is what appendLocsForStack expects. // return PCs, which is what appendLocsForStack expects.
n := pprof_fpunwindExpand(expandedStack[:], r.Stack) n := expandInlinedFrames(expandedStack, r.Stack)
locs = b.appendLocsForStack(locs[:0], expandedStack[:n]) locs = b.appendLocsForStack(locs[:0], expandedStack[:n])
b.pbSample(values, locs, nil) b.pbSample(values, locs, nil)
} }
@ -935,7 +954,7 @@ func writeProfileInternal(w io.Writer, debug int, name string, runtimeProfile fu
for i := range p { for i := range p {
r := &p[i] r := &p[i]
fmt.Fprintf(w, "%v %v @", r.Cycles, r.Count) fmt.Fprintf(w, "%v %v @", r.Cycles, r.Count)
n := pprof_fpunwindExpand(expandedStack, r.Stack) n := expandInlinedFrames(expandedStack, r.Stack)
stack := expandedStack[:n] stack := expandedStack[:n]
for _, pc := range stack { for _, pc := range stack {
fmt.Fprintf(w, " %#x", pc) fmt.Fprintf(w, " %#x", pc)

View File

@ -21,6 +21,7 @@ import (
"regexp" "regexp"
"runtime" "runtime"
"runtime/debug" "runtime/debug"
"strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -2578,3 +2579,166 @@ func produceProfileEvents(t *testing.T, depth int) {
runtime.GC() runtime.GC()
goroutineDeep(t, depth-4) // -4 for produceProfileEvents, **, chanrecv1, chanrev, gopark goroutineDeep(t, depth-4) // -4 for produceProfileEvents, **, chanrecv1, chanrev, gopark
} }
func getProfileStacks(collect func([]runtime.BlockProfileRecord) (int, bool), fileLine bool) []string {
var n int
var ok bool
var p []runtime.BlockProfileRecord
for {
p = make([]runtime.BlockProfileRecord, n)
n, ok = collect(p)
if ok {
p = p[:n]
break
}
}
var stacks []string
for _, r := range p {
var stack strings.Builder
for i, pc := range r.Stack() {
if i > 0 {
stack.WriteByte('\n')
}
// Use FuncForPC instead of CallersFrames,
// because we want to see the info for exactly
// the PCs returned by the mutex profile to
// ensure inlined calls have already been properly
// expanded.
f := runtime.FuncForPC(pc - 1)
stack.WriteString(f.Name())
if fileLine {
stack.WriteByte(' ')
file, line := f.FileLine(pc - 1)
stack.WriteString(file)
stack.WriteByte(':')
stack.WriteString(strconv.Itoa(line))
}
}
stacks = append(stacks, stack.String())
}
return stacks
}
func TestMutexBlockFullAggregation(t *testing.T) {
// This regression test is adapted from
// https://github.com/grafana/pyroscope-go/issues/103,
// authored by Tolya Korniltsev
var m sync.Mutex
prev := runtime.SetMutexProfileFraction(-1)
defer runtime.SetMutexProfileFraction(prev)
const fraction = 1
const iters = 100
const workers = 2
runtime.SetMutexProfileFraction(fraction)
runtime.SetBlockProfileRate(1)
defer runtime.SetBlockProfileRate(0)
wg := sync.WaitGroup{}
wg.Add(workers)
for j := 0; j < workers; j++ {
go func() {
for i := 0; i < iters; i++ {
m.Lock()
// Wait at least 1 millisecond to pass the
// starvation threshold for the mutex
time.Sleep(time.Millisecond)
m.Unlock()
}
wg.Done()
}()
}
wg.Wait()
assertNoDuplicates := func(name string, collect func([]runtime.BlockProfileRecord) (int, bool)) {
stacks := getProfileStacks(collect, true)
seen := make(map[string]struct{})
for _, s := range stacks {
if _, ok := seen[s]; ok {
t.Errorf("saw duplicate entry in %s profile with stack:\n%s", name, s)
}
seen[s] = struct{}{}
}
if len(seen) == 0 {
t.Errorf("did not see any samples in %s profile for this test", name)
}
}
t.Run("mutex", func(t *testing.T) {
assertNoDuplicates("mutex", runtime.MutexProfile)
})
t.Run("block", func(t *testing.T) {
assertNoDuplicates("block", runtime.BlockProfile)
})
}
func inlineA(mu *sync.Mutex, wg *sync.WaitGroup) { inlineB(mu, wg) }
func inlineB(mu *sync.Mutex, wg *sync.WaitGroup) { inlineC(mu, wg) }
func inlineC(mu *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()
mu.Lock()
mu.Unlock()
}
func inlineD(mu *sync.Mutex, wg *sync.WaitGroup) { inlineE(mu, wg) }
func inlineE(mu *sync.Mutex, wg *sync.WaitGroup) { inlineF(mu, wg) }
func inlineF(mu *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()
mu.Unlock()
}
func TestBlockMutexProfileInlineExpansion(t *testing.T) {
runtime.SetBlockProfileRate(1)
defer runtime.SetBlockProfileRate(0)
prev := runtime.SetMutexProfileFraction(1)
defer runtime.SetMutexProfileFraction(prev)
var mu sync.Mutex
var wg sync.WaitGroup
wg.Add(2)
mu.Lock()
go inlineA(&mu, &wg)
awaitBlockedGoroutine(t, "sync.Mutex.Lock", "inlineC", 1)
// inlineD will unblock inlineA
go inlineD(&mu, &wg)
wg.Wait()
tcs := []struct {
Name string
Collect func([]runtime.BlockProfileRecord) (int, bool)
SubStack string
}{
{
Name: "mutex",
Collect: runtime.MutexProfile,
SubStack: `sync.(*Mutex).Unlock
runtime/pprof.inlineF
runtime/pprof.inlineE
runtime/pprof.inlineD`,
},
{
Name: "block",
Collect: runtime.BlockProfile,
SubStack: `sync.(*Mutex).Lock
runtime/pprof.inlineC
runtime/pprof.inlineB
runtime/pprof.inlineA`,
},
}
for _, tc := range tcs {
t.Run(tc.Name, func(t *testing.T) {
stacks := getProfileStacks(tc.Collect, false)
for _, s := range stacks {
if strings.Contains(s, tc.SubStack) {
return
}
}
t.Error("did not see expected stack")
t.Logf("wanted:\n%s", tc.SubStack)
t.Logf("got: %s", stacks)
})
}
}