mirror of
https://github.com/golang/go
synced 2024-11-19 07:04:43 -07:00
runtime: remove futile wakeups from trace
Channels and sync.Mutex'es allow another goroutine to acquire resource ahead of an unblocked goroutine. This is good for performance, but leads to futile wakeups (the unblocked goroutine needs to block again). Futile wakeups caused user confusion during the very first evaluation of tracing functionality on a real server (a goroutine as if acquires a mutex in a loop, while there is no loop in user code). This change detects futile wakeups on channels and emits a special event to denote the fact. Later parser finds entire wakeup sequences (unblock->start->block) and removes them. sync.Mutex will be supported in a separate change. Change-Id: Iaaaee9d5c0921afc62b449a97447445030ac19d3 Reviewed-on: https://go-review.googlesource.com/7380 Reviewed-by: Keith Randall <khr@golang.org>
This commit is contained in:
parent
1b49a86ece
commit
4396ea96c4
@ -56,7 +56,7 @@ const (
|
||||
SyscallP // depicts returns from syscalls
|
||||
)
|
||||
|
||||
// parseTrace parses, post-processes and verifies the trace.
|
||||
// Parse parses, post-processes and verifies the trace.
|
||||
func Parse(r io.Reader) ([]*Event, error) {
|
||||
rawEvents, err := readTrace(r)
|
||||
if err != nil {
|
||||
@ -66,6 +66,10 @@ func Parse(r io.Reader) ([]*Event, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
events, err = removeFutile(events)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = postProcessTrace(events)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -265,6 +269,61 @@ func parseEvents(rawEvents []rawEvent) (events []*Event, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// removeFutile removes all constituents of futile wakeups (block, unblock, start).
|
||||
// For example, a goroutine was unblocked on a mutex, but another goroutine got
|
||||
// ahead and acquired the mutex before the first goroutine is scheduled,
|
||||
// so the first goroutine has to block again. Such wakeups happen on buffered
|
||||
// channels and sync.Mutex, but are generally not interesting for end user.
|
||||
func removeFutile(events []*Event) ([]*Event, error) {
|
||||
// Two non-trivial aspects:
|
||||
// 1. A goroutine can be preempted during a futile wakeup and migrate to another P.
|
||||
// We want to remove all of that.
|
||||
// 2. Tracing can start in the middle of a futile wakeup.
|
||||
// That is, we can see a futile wakeup event w/o the actual wakeup before it.
|
||||
// postProcessTrace runs after us and ensures that we leave the trace in a consistent state.
|
||||
|
||||
// Phase 1: determine futile wakeup sequences.
|
||||
type G struct {
|
||||
futile bool
|
||||
wakeup []*Event // wakeup sequence (subject for removal)
|
||||
}
|
||||
gs := make(map[uint64]G)
|
||||
futile := make(map[*Event]bool)
|
||||
for _, ev := range events {
|
||||
switch ev.Type {
|
||||
case EvGoUnblock:
|
||||
g := gs[ev.Args[0]]
|
||||
g.wakeup = []*Event{ev}
|
||||
gs[ev.Args[0]] = g
|
||||
case EvGoStart, EvGoPreempt, EvFutileWakeup:
|
||||
g := gs[ev.G]
|
||||
g.wakeup = append(g.wakeup, ev)
|
||||
if ev.Type == EvFutileWakeup {
|
||||
g.futile = true
|
||||
}
|
||||
gs[ev.G] = g
|
||||
case EvGoBlock, EvGoBlockSend, EvGoBlockRecv, EvGoBlockSelect, EvGoBlockSync, EvGoBlockCond:
|
||||
g := gs[ev.G]
|
||||
if g.futile {
|
||||
futile[ev] = true
|
||||
for _, ev1 := range g.wakeup {
|
||||
futile[ev1] = true
|
||||
}
|
||||
}
|
||||
delete(gs, ev.G)
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 2: remove futile wakeup sequences.
|
||||
newEvents := events[:0] // overwrite the original slice
|
||||
for _, ev := range events {
|
||||
if !futile[ev] {
|
||||
newEvents = append(newEvents, ev)
|
||||
}
|
||||
}
|
||||
return newEvents, nil
|
||||
}
|
||||
|
||||
// postProcessTrace does inter-event verification and information restoration.
|
||||
// The resulting trace is guaranteed to be consistent
|
||||
// (for example, a P does not run two Gs at the same time, or a G is indeed
|
||||
@ -618,7 +677,8 @@ const (
|
||||
EvHeapAlloc = 33 // memstats.heap_alloc change [timestamp, heap_alloc]
|
||||
EvNextGC = 34 // memstats.next_gc change [timestamp, next_gc]
|
||||
EvTimerGoroutine = 35 // denotes timer goroutine [timer goroutine id]
|
||||
EvCount = 36
|
||||
EvFutileWakeup = 36 // denotes that the revious wakeup of this goroutine was futile [timestamp]
|
||||
EvCount = 37
|
||||
)
|
||||
|
||||
var EventDescriptions = [EvCount]struct {
|
||||
@ -662,4 +722,5 @@ var EventDescriptions = [EvCount]struct {
|
||||
EvHeapAlloc: {"HeapAlloc", false, []string{"mem"}},
|
||||
EvNextGC: {"NextGC", false, []string{"mem"}},
|
||||
EvTimerGoroutine: {"TimerGoroutine", false, []string{"g"}},
|
||||
EvFutileWakeup: {"FutileWakeup", false, []string{}},
|
||||
}
|
||||
|
@ -219,7 +219,7 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin
|
||||
// asynchronous channel
|
||||
// wait for some space to write our data
|
||||
var t1 int64
|
||||
for c.qcount >= c.dataqsiz {
|
||||
for futile := byte(0); c.qcount >= c.dataqsiz; futile = traceFutileWakeup {
|
||||
if !block {
|
||||
unlock(&c.lock)
|
||||
return false
|
||||
@ -234,7 +234,7 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin
|
||||
mysg.elem = nil
|
||||
mysg.selectdone = nil
|
||||
c.sendq.enqueue(mysg)
|
||||
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
|
||||
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend|futile, 3)
|
||||
|
||||
// someone woke us up - try again
|
||||
if mysg.releasetime > 0 {
|
||||
@ -462,7 +462,7 @@ func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, r
|
||||
// asynchronous channel
|
||||
// wait for some data to appear
|
||||
var t1 int64
|
||||
for c.qcount <= 0 {
|
||||
for futile := byte(0); c.qcount <= 0; futile = traceFutileWakeup {
|
||||
if c.closed != 0 {
|
||||
selected, received = recvclosed(c, ep)
|
||||
if t1 > 0 {
|
||||
@ -488,7 +488,7 @@ func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, r
|
||||
mysg.selectdone = nil
|
||||
|
||||
c.recvq.enqueue(mysg)
|
||||
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
|
||||
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv|futile, 3)
|
||||
|
||||
// someone woke us up - try again
|
||||
if mysg.releasetime > 0 {
|
||||
|
@ -360,3 +360,91 @@ func TestTraceStressStartStop(t *testing.T) {
|
||||
}
|
||||
<-outerDone
|
||||
}
|
||||
|
||||
func TestTraceFutileWakeup(t *testing.T) {
|
||||
// The test generates a full-load of futile wakeups on channels,
|
||||
// and ensures that the trace is consistent after their removal.
|
||||
skipTraceTestsIfNeeded(t)
|
||||
buf := new(bytes.Buffer)
|
||||
if err := StartTrace(buf); err != nil {
|
||||
t.Fatalf("failed to start tracing: %v", err)
|
||||
}
|
||||
|
||||
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(8))
|
||||
c0 := make(chan int, 1)
|
||||
c1 := make(chan int, 1)
|
||||
c2 := make(chan int, 1)
|
||||
const procs = 2
|
||||
var done sync.WaitGroup
|
||||
done.Add(4 * procs)
|
||||
for p := 0; p < procs; p++ {
|
||||
const iters = 1e3
|
||||
go func() {
|
||||
for i := 0; i < iters; i++ {
|
||||
runtime.Gosched()
|
||||
c0 <- 0
|
||||
}
|
||||
done.Done()
|
||||
}()
|
||||
go func() {
|
||||
for i := 0; i < iters; i++ {
|
||||
runtime.Gosched()
|
||||
<-c0
|
||||
}
|
||||
done.Done()
|
||||
}()
|
||||
go func() {
|
||||
for i := 0; i < iters; i++ {
|
||||
runtime.Gosched()
|
||||
select {
|
||||
case c1 <- 0:
|
||||
case c2 <- 0:
|
||||
}
|
||||
}
|
||||
done.Done()
|
||||
}()
|
||||
go func() {
|
||||
for i := 0; i < iters; i++ {
|
||||
runtime.Gosched()
|
||||
select {
|
||||
case <-c1:
|
||||
case <-c2:
|
||||
}
|
||||
}
|
||||
done.Done()
|
||||
}()
|
||||
}
|
||||
done.Wait()
|
||||
|
||||
StopTrace()
|
||||
events, _, err := parseTrace(buf)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to parse trace: %v", err)
|
||||
}
|
||||
// Check that (1) trace does not contain EvFutileWakeup events and
|
||||
// (2) there are no consecutive EvGoBlock/EvGCStart/EvGoBlock events
|
||||
// (we call runtime.Gosched between all operations, so these would be futile wakeups).
|
||||
gs := make(map[uint64]int)
|
||||
for _, ev := range events {
|
||||
switch ev.Type {
|
||||
case trace.EvFutileWakeup:
|
||||
t.Fatalf("found EvFutileWakeup event")
|
||||
case trace.EvGoBlockSend, trace.EvGoBlockRecv, trace.EvGoBlockSelect:
|
||||
if gs[ev.G] == 2 {
|
||||
t.Fatalf("goroutine %v blocked on %v at %v right after start",
|
||||
ev.G, trace.EventDescriptions[ev.Type].Name, ev.Ts)
|
||||
}
|
||||
if gs[ev.G] == 1 {
|
||||
t.Fatalf("goroutine %v blocked on %v at %v while blocked",
|
||||
ev.G, trace.EventDescriptions[ev.Type].Name, ev.Ts)
|
||||
}
|
||||
gs[ev.G] = 1
|
||||
case trace.EvGoStart:
|
||||
if gs[ev.G] == 1 {
|
||||
gs[ev.G] = 2
|
||||
}
|
||||
default:
|
||||
delete(gs, ev.G)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -308,6 +308,7 @@ func selectgoImpl(sel *hselect) (uintptr, uint16) {
|
||||
k *scase
|
||||
sglist *sudog
|
||||
sgnext *sudog
|
||||
futile byte
|
||||
)
|
||||
|
||||
loop:
|
||||
@ -392,7 +393,7 @@ loop:
|
||||
|
||||
// wait for someone to wake us up
|
||||
gp.param = nil
|
||||
gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect, 2)
|
||||
gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect|futile, 2)
|
||||
|
||||
// someone woke us up
|
||||
sellock(sel)
|
||||
@ -435,6 +436,7 @@ loop:
|
||||
}
|
||||
|
||||
if cas == nil {
|
||||
futile = traceFutileWakeup
|
||||
goto loop
|
||||
}
|
||||
|
||||
|
@ -52,7 +52,8 @@ const (
|
||||
traceEvHeapAlloc = 33 // memstats.heap_alloc change [timestamp, heap_alloc]
|
||||
traceEvNextGC = 34 // memstats.next_gc change [timestamp, next_gc]
|
||||
traceEvTimerGoroutine = 35 // denotes timer goroutine [timer goroutine id]
|
||||
traceEvCount = 36
|
||||
traceEvFutileWakeup = 36 // denotes that the previous wakeup of this goroutine was futile [timestamp]
|
||||
traceEvCount = 37
|
||||
)
|
||||
|
||||
const (
|
||||
@ -71,6 +72,13 @@ const (
|
||||
traceBytesPerNumber = 10
|
||||
// Shift of the number of arguments in the first event byte.
|
||||
traceArgCountShift = 6
|
||||
// Flag passed to traceGoPark to denote that the previous wakeup of this
|
||||
// goroutine was futile. For example, a goroutine was unblocked on a mutex,
|
||||
// but another goroutine got ahead and acquired the mutex before the first
|
||||
// goroutine is scheduled, so the first goroutine has to block again.
|
||||
// Such wakeups happen on buffered channels and sync.Mutex,
|
||||
// but are generally not interesting for end user.
|
||||
traceFutileWakeup byte = 128
|
||||
)
|
||||
|
||||
// trace is global tracing context.
|
||||
@ -775,7 +783,10 @@ func traceGoPreempt() {
|
||||
}
|
||||
|
||||
func traceGoPark(traceEv byte, skip int, gp *g) {
|
||||
traceEvent(traceEv, skip)
|
||||
if traceEv&traceFutileWakeup != 0 {
|
||||
traceEvent(traceEvFutileWakeup, -1)
|
||||
}
|
||||
traceEvent(traceEv & ^traceFutileWakeup, skip)
|
||||
}
|
||||
|
||||
func traceGoUnpark(gp *g, skip int) {
|
||||
|
Loading…
Reference in New Issue
Block a user