1
0
mirror of https://github.com/golang/go synced 2024-11-16 16:54:39 -07:00

internal/trace/v2: clean up the ordering interface

This change cleans up the ordering interface in several ways.

First, it resolves a TODO about using a proper queue of events for extra
events produced while performing ordering. This will be necessary for a
follow-up change to handle coroutine switch events.

Next, it simplifies the ordering.advance method's signature by not
returning a schedCtx. Instead, ordering.advance will take responsibility
for constructing the final Event instead of the caller, and places it on
its own internal queue (in addition to any other Events generated). The
caller is then responsible for taking events off of the queue with a new
method Next.

Finally, hand-in-hand with the signature change, the implementation of
ordering.advance no longer forces each switch case to return but instead
has them converge past the switch. This has two effects. One is that we
eliminate the deferred call to update the M state. Using a defer here is
technically incorrect, because we might end up changing the M state even
if we don't advance the event! We got lucky here that curCtx == newCtx
in all such cases, but there may have been a subtle bug lurking here.

Unfortunately because of the queue's semantics however, we can't
actually avoid pushing into the queue at every possible successful exit
out of the switch. Hopefully this can become less error-prone in the
future by splitting up the switch into a dispatch of different
functions, instead of everything living in one giant function. This
cleanup will happen in a follow-up change.

Change-Id: Ifebbbf14e8ed5c08be5c1b0fadc2e5df3915c656
Reviewed-on: https://go-review.googlesource.com/c/go/+/565936
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Michael Pratt <mpratt@google.com>
Auto-Submit: Michael Knyszek <mknyszek@google.com>
This commit is contained in:
Michael Anthony Knyszek 2024-02-20 20:58:18 +00:00 committed by Gopher Robot
parent 52b5f164ae
commit 7348773640
3 changed files with 284 additions and 176 deletions

View File

@ -15,6 +15,11 @@ import (
// ordering emulates Go scheduler state for both validation and
// for putting events in the right order.
//
// The interface to ordering consists of two methods: Advance
// and Next. Advance is called to try and advance an event and
// add completed events to the ordering. Next is used to pick
// off events in the ordering.
type ordering struct {
gStates map[GoID]*gState
pStates map[ProcID]*pState // TODO: The keys are dense, so this can be a slice.
@ -23,29 +28,10 @@ type ordering struct {
gcSeq uint64
gcState gcState
initialGen uint64
// Some events like GoDestroySyscall produce two events instead of one.
// extraEvent is this extra space. advance must not be called unless
// the extraEvent has been consumed with consumeExtraEvent.
//
// TODO(mknyszek): Replace this with a more formal queue.
extraEvent Event
queue queue[Event]
}
// consumeExtraEvent consumes the extra event.
func (o *ordering) consumeExtraEvent() Event {
if o.extraEvent.Kind() == EventBad {
return Event{}
}
r := o.extraEvent
o.extraEvent = Event{}
return r
}
// advance checks if it's valid to proceed with ev which came from thread m.
//
// Returns the schedCtx at the point of the event, whether it's OK to advance
// with this event, and any error encountered in validation.
// Advance checks if it's valid to proceed with ev which came from thread m.
//
// It assumes the gen value passed to it is monotonically increasing across calls.
//
@ -53,7 +39,11 @@ func (o *ordering) consumeExtraEvent() Event {
// If it's not valid to advance with ev, but no error was encountered, the caller
// should attempt to advance with other candidate events from other threads. If the
// caller runs out of candidates, the trace is invalid.
func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64) (schedCtx, bool, error) {
//
// If this returns true, Next is guaranteed to return a complete event. However,
// multiple events may be added to the ordering, so the caller should (but is not
// required to) continue to call Next until it is exhausted.
func (o *ordering) Advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64) (bool, error) {
if o.initialGen == 0 {
// Set the initial gen if necessary.
o.initialGen = gen
@ -63,13 +53,15 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
curCtx.M = m
newCtx.M = m
var ms *mState
if m == NoThread {
curCtx.P = NoProc
curCtx.G = NoGoroutine
newCtx = curCtx
} else {
// Pull out or create the mState for this event.
ms, ok := o.mStates[m]
var ok bool
ms, ok = o.mStates[m]
if !ok {
ms = &mState{
g: NoGoroutine,
@ -80,11 +72,11 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
curCtx.P = ms.p
curCtx.G = ms.g
newCtx = curCtx
defer func() {
// Update the mState for this event.
ms.p = newCtx.P
ms.g = newCtx.G
}()
}
// Generates an event from the current context.
currentEvent := func() Event {
return Event{table: evt, ctx: curCtx, base: *ev}
}
switch typ := ev.typ; typ {
@ -93,7 +85,7 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
pid := ProcID(ev.args[0])
status := go122.ProcStatus(ev.args[1])
if int(status) >= len(go122ProcStatus2ProcState) {
return curCtx, false, fmt.Errorf("invalid status for proc %d: %d", pid, status)
return false, fmt.Errorf("invalid status for proc %d: %d", pid, status)
}
oldState := go122ProcStatus2ProcState[status]
if s, ok := o.pStates[pid]; ok {
@ -110,7 +102,7 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
oldState = ProcIdle
ev.args[1] = uint64(go122.ProcSyscallAbandoned)
} else if s.status != status {
return curCtx, false, fmt.Errorf("inconsistent status for proc %d: old %v vs. new %v", pid, s.status, status)
return false, fmt.Errorf("inconsistent status for proc %d: old %v vs. new %v", pid, s.status, status)
}
s.seq = makeSeq(gen, 0) // Reset seq.
} else {
@ -145,10 +137,10 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
}
}
if !found {
return curCtx, false, fmt.Errorf("failed to find sched context for proc %d that's about to be stolen", pid)
return false, fmt.Errorf("failed to find sched context for proc %d that's about to be stolen", pid)
}
}
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvProcStart:
pid := ProcID(ev.args[0])
seq := makeSeq(gen, ev.args[1])
@ -163,19 +155,19 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
// got to the right point in the trace.
//
// Note that we also don't advance here if we have a P and we're in a syscall.
return curCtx, false, nil
return false, nil
}
// We can advance this P. Check some invariants.
//
// We might have a goroutine if a goroutine is exiting a syscall.
reqs := event.SchedReqs{Thread: event.MustHave, Proc: event.MustNotHave, Goroutine: event.MayHave}
if err := validateCtx(curCtx, reqs); err != nil {
return curCtx, false, err
return false, err
}
state.status = go122.ProcRunning
state.seq = seq
newCtx.P = pid
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvProcStop:
// We must be able to advance this P.
//
@ -188,18 +180,18 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
// ProcStop doesn't need a sequence number.
state, ok := o.pStates[curCtx.P]
if !ok {
return curCtx, false, fmt.Errorf("event %s for proc (%v) that doesn't exist", go122.EventString(typ), curCtx.P)
return false, fmt.Errorf("event %s for proc (%v) that doesn't exist", go122.EventString(typ), curCtx.P)
}
if state.status != go122.ProcRunning && state.status != go122.ProcSyscall {
return curCtx, false, fmt.Errorf("%s event for proc that's not %s or %s", go122.EventString(typ), go122.ProcRunning, go122.ProcSyscall)
return false, fmt.Errorf("%s event for proc that's not %s or %s", go122.EventString(typ), go122.ProcRunning, go122.ProcSyscall)
}
reqs := event.SchedReqs{Thread: event.MustHave, Proc: event.MustHave, Goroutine: event.MayHave}
if err := validateCtx(curCtx, reqs); err != nil {
return curCtx, false, err
return false, err
}
state.status = go122.ProcIdle
newCtx.P = NoProc
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvProcSteal:
pid := ProcID(ev.args[0])
seq := makeSeq(gen, ev.args[1])
@ -208,12 +200,12 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
// We can't make an inference as to whether this is bad. We could just be seeing
// a ProcStart on a different M before the proc's state was emitted, or before we
// got to the right point in the trace.
return curCtx, false, nil
return false, nil
}
// We can advance this P. Check some invariants.
reqs := event.SchedReqs{Thread: event.MustHave, Proc: event.MayHave, Goroutine: event.MayHave}
if err := validateCtx(curCtx, reqs); err != nil {
return curCtx, false, err
return false, err
}
// Smuggle in the P state that let us advance so we can surface information to the event.
// Specifically, we need to make sure that the event is interpreted not as a transition of
@ -231,7 +223,8 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
// If we've lost information then don't try to do anything with the M.
// It may have moved on and we can't be sure.
if oldStatus == go122.ProcSyscallAbandoned {
return curCtx, true, nil
o.queue.push(currentEvent())
break
}
// Validate that the M we're stealing from is what we expect.
@ -240,21 +233,22 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
if mid == curCtx.M {
// We're stealing from ourselves. This behaves like a ProcStop.
if curCtx.P != pid {
return curCtx, false, fmt.Errorf("tried to self-steal proc %d (thread %d), but got proc %d instead", pid, mid, curCtx.P)
return false, fmt.Errorf("tried to self-steal proc %d (thread %d), but got proc %d instead", pid, mid, curCtx.P)
}
newCtx.P = NoProc
return curCtx, true, nil
o.queue.push(currentEvent())
break
}
// We're stealing from some other M.
mState, ok := o.mStates[mid]
if !ok {
return curCtx, false, fmt.Errorf("stole proc from non-existent thread %d", mid)
return false, fmt.Errorf("stole proc from non-existent thread %d", mid)
}
// Make sure we're actually stealing the right P.
if mState.p != pid {
return curCtx, false, fmt.Errorf("tried to steal proc %d from thread %d, but got proc %d instead", pid, mid, mState.p)
return false, fmt.Errorf("tried to steal proc %d from thread %d, but got proc %d instead", pid, mid, mState.p)
}
// Tell the M it has no P so it can proceed.
@ -264,7 +258,7 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
// GoSyscallEndBlocked cannot advance until the corresponding
// M loses its P.
mState.p = NoProc
return curCtx, true, nil
o.queue.push(currentEvent())
// Handle goroutines.
case go122.EvGoStatus:
@ -273,12 +267,12 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
status := go122.GoStatus(ev.args[2])
if int(status) >= len(go122GoStatus2GoState) {
return curCtx, false, fmt.Errorf("invalid status for goroutine %d: %d", gid, status)
return false, fmt.Errorf("invalid status for goroutine %d: %d", gid, status)
}
oldState := go122GoStatus2GoState[status]
if s, ok := o.gStates[gid]; ok {
if s.status != status {
return curCtx, false, fmt.Errorf("inconsistent status for goroutine %d: old %v vs. new %v", gid, s.status, status)
return false, fmt.Errorf("inconsistent status for goroutine %d: old %v vs. new %v", gid, s.status, status)
}
s.seq = makeSeq(gen, 0) // Reset seq.
} else if gen == o.initialGen {
@ -286,7 +280,7 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
o.gStates[gid] = &gState{id: gid, status: status, seq: makeSeq(gen, 0)}
oldState = GoUndetermined
} else {
return curCtx, false, fmt.Errorf("found goroutine status for new goroutine after the first generation: id=%v status=%v", gid, status)
return false, fmt.Errorf("found goroutine status for new goroutine after the first generation: id=%v status=%v", gid, status)
}
ev.extra(version.Go122)[0] = uint64(oldState) // Smuggle in the old state for StateTransition.
@ -296,7 +290,7 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
newCtx.G = gid
case go122.GoSyscall:
if mid == NoThread {
return curCtx, false, fmt.Errorf("found goroutine %d in syscall without a thread", gid)
return false, fmt.Errorf("found goroutine %d in syscall without a thread", gid)
}
// Is the syscall on this thread? If so, bind it to the context.
// Otherwise, we're talking about a G sitting in a syscall on an M.
@ -307,7 +301,7 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
// binding occur already. Even if the G was blocked in a syscall
// for multiple generations since trace start, we would have seen
// a previous GoStatus event that bound the goroutine to an M.
return curCtx, false, fmt.Errorf("inconsistent thread for syscalling goroutine %d: thread has goroutine %d", gid, curCtx.G)
return false, fmt.Errorf("inconsistent thread for syscalling goroutine %d: thread has goroutine %d", gid, curCtx.G)
}
newCtx.G = gid
break
@ -323,7 +317,7 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
// goroutine go into a syscall on this thread at some point.
if ms.g != gid {
// But the G on the M doesn't match. Something's wrong.
return curCtx, false, fmt.Errorf("inconsistent thread for syscalling goroutine %d: thread has goroutine %d", gid, ms.g)
return false, fmt.Errorf("inconsistent thread for syscalling goroutine %d: thread has goroutine %d", gid, ms.g)
}
// This case is just a Syscall->Syscall event, which needs to
// appear as having the G currently bound to this M.
@ -339,38 +333,38 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
// Update the current context to the M we're talking about.
curCtx.M = mid
}
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvGoCreate:
// Goroutines must be created on a running P, but may or may not be created
// by a running goroutine.
reqs := event.SchedReqs{Thread: event.MustHave, Proc: event.MustHave, Goroutine: event.MayHave}
if err := validateCtx(curCtx, reqs); err != nil {
return curCtx, false, err
return false, err
}
// If we have a goroutine, it must be running.
if state, ok := o.gStates[curCtx.G]; ok && state.status != go122.GoRunning {
return curCtx, false, fmt.Errorf("%s event for goroutine that's not %s", go122.EventString(typ), GoRunning)
return false, fmt.Errorf("%s event for goroutine that's not %s", go122.EventString(typ), GoRunning)
}
// This goroutine created another. Add a state for it.
newgid := GoID(ev.args[0])
if _, ok := o.gStates[newgid]; ok {
return curCtx, false, fmt.Errorf("tried to create goroutine (%v) that already exists", newgid)
return false, fmt.Errorf("tried to create goroutine (%v) that already exists", newgid)
}
o.gStates[newgid] = &gState{id: newgid, status: go122.GoRunnable, seq: makeSeq(gen, 0)}
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvGoDestroy, go122.EvGoStop, go122.EvGoBlock:
// These are goroutine events that all require an active running
// goroutine on some thread. They must *always* be advance-able,
// since running goroutines are bound to their M.
if err := validateCtx(curCtx, event.UserGoReqs); err != nil {
return curCtx, false, err
return false, err
}
state, ok := o.gStates[curCtx.G]
if !ok {
return curCtx, false, fmt.Errorf("event %s for goroutine (%v) that doesn't exist", go122.EventString(typ), curCtx.G)
return false, fmt.Errorf("event %s for goroutine (%v) that doesn't exist", go122.EventString(typ), curCtx.G)
}
if state.status != go122.GoRunning {
return curCtx, false, fmt.Errorf("%s event for goroutine that's not %s", go122.EventString(typ), GoRunning)
return false, fmt.Errorf("%s event for goroutine that's not %s", go122.EventString(typ), GoRunning)
}
// Handle each case slightly differently; we just group them together
// because they have shared preconditions.
@ -388,7 +382,7 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
state.status = go122.GoWaiting
newCtx.G = NoGoroutine
}
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvGoStart:
gid := GoID(ev.args[0])
seq := makeSeq(gen, ev.args[1])
@ -397,17 +391,17 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
// We can't make an inference as to whether this is bad. We could just be seeing
// a GoStart on a different M before the goroutine was created, before it had its
// state emitted, or before we got to the right point in the trace yet.
return curCtx, false, nil
return false, nil
}
// We can advance this goroutine. Check some invariants.
reqs := event.SchedReqs{Thread: event.MustHave, Proc: event.MustHave, Goroutine: event.MustNotHave}
if err := validateCtx(curCtx, reqs); err != nil {
return curCtx, false, err
return false, err
}
state.status = go122.GoRunning
state.seq = seq
newCtx.G = gid
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvGoUnblock:
// N.B. These both reference the goroutine to unblock, not the current goroutine.
gid := GoID(ev.args[0])
@ -417,31 +411,31 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
// We can't make an inference as to whether this is bad. We could just be seeing
// a GoUnblock on a different M before the goroutine was created and blocked itself,
// before it had its state emitted, or before we got to the right point in the trace yet.
return curCtx, false, nil
return false, nil
}
state.status = go122.GoRunnable
state.seq = seq
// N.B. No context to validate. Basically anything can unblock
// a goroutine (e.g. sysmon).
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvGoSyscallBegin:
// Entering a syscall requires an active running goroutine with a
// proc on some thread. It is always advancable.
if err := validateCtx(curCtx, event.UserGoReqs); err != nil {
return curCtx, false, err
return false, err
}
state, ok := o.gStates[curCtx.G]
if !ok {
return curCtx, false, fmt.Errorf("event %s for goroutine (%v) that doesn't exist", go122.EventString(typ), curCtx.G)
return false, fmt.Errorf("event %s for goroutine (%v) that doesn't exist", go122.EventString(typ), curCtx.G)
}
if state.status != go122.GoRunning {
return curCtx, false, fmt.Errorf("%s event for goroutine that's not %s", go122.EventString(typ), GoRunning)
return false, fmt.Errorf("%s event for goroutine that's not %s", go122.EventString(typ), GoRunning)
}
// Goroutine entered a syscall. It's still running on this P and M.
state.status = go122.GoSyscall
pState, ok := o.pStates[curCtx.P]
if !ok {
return curCtx, false, fmt.Errorf("uninitialized proc %d found during %s", curCtx.P, go122.EventString(typ))
return false, fmt.Errorf("uninitialized proc %d found during %s", curCtx.P, go122.EventString(typ))
}
pState.status = go122.ProcSyscall
// Validate the P sequence number on the event and advance it.
@ -457,36 +451,36 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
// to back off and see if any other events will advance. This is a running P.
pSeq := makeSeq(gen, ev.args[0])
if !pSeq.succeeds(pState.seq) {
return curCtx, false, fmt.Errorf("failed to advance %s: can't make sequence: %s -> %s", go122.EventString(typ), pState.seq, pSeq)
return false, fmt.Errorf("failed to advance %s: can't make sequence: %s -> %s", go122.EventString(typ), pState.seq, pSeq)
}
pState.seq = pSeq
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvGoSyscallEnd:
// This event is always advance-able because it happens on the same
// thread that EvGoSyscallStart happened, and the goroutine can't leave
// that thread until its done.
if err := validateCtx(curCtx, event.UserGoReqs); err != nil {
return curCtx, false, err
return false, err
}
state, ok := o.gStates[curCtx.G]
if !ok {
return curCtx, false, fmt.Errorf("event %s for goroutine (%v) that doesn't exist", go122.EventString(typ), curCtx.G)
return false, fmt.Errorf("event %s for goroutine (%v) that doesn't exist", go122.EventString(typ), curCtx.G)
}
if state.status != go122.GoSyscall {
return curCtx, false, fmt.Errorf("%s event for goroutine that's not %s", go122.EventString(typ), GoRunning)
return false, fmt.Errorf("%s event for goroutine that's not %s", go122.EventString(typ), GoRunning)
}
state.status = go122.GoRunning
// Transfer the P back to running from syscall.
pState, ok := o.pStates[curCtx.P]
if !ok {
return curCtx, false, fmt.Errorf("uninitialized proc %d found during %s", curCtx.P, go122.EventString(typ))
return false, fmt.Errorf("uninitialized proc %d found during %s", curCtx.P, go122.EventString(typ))
}
if pState.status != go122.ProcSyscall {
return curCtx, false, fmt.Errorf("expected proc %d in state %v, but got %v instead", curCtx.P, go122.ProcSyscall, pState.status)
return false, fmt.Errorf("expected proc %d in state %v, but got %v instead", curCtx.P, go122.ProcSyscall, pState.status)
}
pState.status = go122.ProcRunning
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvGoSyscallEndBlocked:
// This event becomes advanceable when its P is not in a syscall state
// (lack of a P altogether is also acceptable for advancing).
@ -501,43 +495,43 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
if curCtx.P != NoProc {
pState, ok := o.pStates[curCtx.P]
if !ok {
return curCtx, false, fmt.Errorf("uninitialized proc %d found during %s", curCtx.P, go122.EventString(typ))
return false, fmt.Errorf("uninitialized proc %d found during %s", curCtx.P, go122.EventString(typ))
}
if pState.status == go122.ProcSyscall {
return curCtx, false, nil
return false, nil
}
}
// As mentioned above, we may have a P here if we ProcStart
// before this event.
if err := validateCtx(curCtx, event.SchedReqs{Thread: event.MustHave, Proc: event.MayHave, Goroutine: event.MustHave}); err != nil {
return curCtx, false, err
return false, err
}
state, ok := o.gStates[curCtx.G]
if !ok {
return curCtx, false, fmt.Errorf("event %s for goroutine (%v) that doesn't exist", go122.EventString(typ), curCtx.G)
return false, fmt.Errorf("event %s for goroutine (%v) that doesn't exist", go122.EventString(typ), curCtx.G)
}
if state.status != go122.GoSyscall {
return curCtx, false, fmt.Errorf("%s event for goroutine that's not %s", go122.EventString(typ), GoRunning)
return false, fmt.Errorf("%s event for goroutine that's not %s", go122.EventString(typ), GoRunning)
}
newCtx.G = NoGoroutine
state.status = go122.GoRunnable
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvGoCreateSyscall:
// This event indicates that a goroutine is effectively
// being created out of a cgo callback. Such a goroutine
// is 'created' in the syscall state.
if err := validateCtx(curCtx, event.SchedReqs{Thread: event.MustHave, Proc: event.MayHave, Goroutine: event.MustNotHave}); err != nil {
return curCtx, false, err
return false, err
}
// This goroutine is effectively being created. Add a state for it.
newgid := GoID(ev.args[0])
if _, ok := o.gStates[newgid]; ok {
return curCtx, false, fmt.Errorf("tried to create goroutine (%v) in syscall that already exists", newgid)
return false, fmt.Errorf("tried to create goroutine (%v) in syscall that already exists", newgid)
}
o.gStates[newgid] = &gState{id: newgid, status: go122.GoSyscall, seq: makeSeq(gen, 0)}
// Goroutine is executing. Bind it to the context.
newCtx.G = newgid
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvGoDestroySyscall:
// This event indicates that a goroutine created for a
// cgo callback is disappearing, either because the callback
@ -556,15 +550,15 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
// eagerly by the runtime, and it might get stolen back later
// (or never again, if the program is going to exit).
if err := validateCtx(curCtx, event.SchedReqs{Thread: event.MustHave, Proc: event.MayHave, Goroutine: event.MustHave}); err != nil {
return curCtx, false, err
return false, err
}
// Check to make sure the goroutine exists in the right state.
state, ok := o.gStates[curCtx.G]
if !ok {
return curCtx, false, fmt.Errorf("event %s for goroutine (%v) that doesn't exist", go122.EventString(typ), curCtx.G)
return false, fmt.Errorf("event %s for goroutine (%v) that doesn't exist", go122.EventString(typ), curCtx.G)
}
if state.status != go122.GoSyscall {
return curCtx, false, fmt.Errorf("%s event for goroutine that's not %v", go122.EventString(typ), GoSyscall)
return false, fmt.Errorf("%s event for goroutine that's not %v", go122.EventString(typ), GoSyscall)
}
// This goroutine is exiting itself.
delete(o.gStates, curCtx.G)
@ -574,17 +568,17 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
if curCtx.P != NoProc {
pState, ok := o.pStates[curCtx.P]
if !ok {
return curCtx, false, fmt.Errorf("found invalid proc %d during %s", curCtx.P, go122.EventString(typ))
return false, fmt.Errorf("found invalid proc %d during %s", curCtx.P, go122.EventString(typ))
}
if pState.status != go122.ProcSyscall {
return curCtx, false, fmt.Errorf("proc %d in unexpected state %s during %s", curCtx.P, pState.status, go122.EventString(typ))
return false, fmt.Errorf("proc %d in unexpected state %s during %s", curCtx.P, pState.status, go122.EventString(typ))
}
// See the go122-create-syscall-reuse-thread-id test case for more details.
pState.status = go122.ProcSyscallAbandoned
newCtx.P = NoProc
// Queue an extra self-ProcSteal event.
o.extraEvent = Event{
extra := Event{
table: evt,
ctx: curCtx,
base: baseEvent{
@ -592,10 +586,11 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
time: ev.time,
},
}
o.extraEvent.base.args[0] = uint64(curCtx.P)
o.extraEvent.base.extra(version.Go122)[0] = uint64(go122.ProcSyscall)
extra.base.args[0] = uint64(curCtx.P)
extra.base.extra(version.Go122)[0] = uint64(go122.ProcSyscall)
o.queue.push(extra)
}
return curCtx, true, nil
o.queue.push(currentEvent())
// Handle tasks. Tasks are interesting because:
// - There's no Begin event required to reference a task.
@ -607,7 +602,7 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
case go122.EvUserTaskBegin:
id := TaskID(ev.args[0])
if _, ok := o.activeTasks[id]; ok {
return curCtx, false, fmt.Errorf("task ID conflict: %d", id)
return false, fmt.Errorf("task ID conflict: %d", id)
}
// Get the parent ID, but don't validate it. There's no guarantee
// we actually have information on whether it's active.
@ -625,10 +620,13 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
nameID := stringID(ev.args[2])
name, ok := evt.strings.get(nameID)
if !ok {
return curCtx, false, fmt.Errorf("invalid string ID %v for %v event", nameID, typ)
return false, fmt.Errorf("invalid string ID %v for %v event", nameID, typ)
}
o.activeTasks[id] = taskState{name: name, parentID: parentID}
return curCtx, true, validateCtx(curCtx, event.UserGoReqs)
if err := validateCtx(curCtx, event.UserGoReqs); err != nil {
return false, err
}
o.queue.push(currentEvent())
case go122.EvUserTaskEnd:
id := TaskID(ev.args[0])
if ts, ok := o.activeTasks[id]; ok {
@ -643,45 +641,48 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
ev.extra(version.Go122)[0] = uint64(NoTask)
ev.extra(version.Go122)[1] = uint64(evt.addExtraString(""))
}
return curCtx, true, validateCtx(curCtx, event.UserGoReqs)
if err := validateCtx(curCtx, event.UserGoReqs); err != nil {
return false, err
}
o.queue.push(currentEvent())
// Handle user regions.
case go122.EvUserRegionBegin:
if err := validateCtx(curCtx, event.UserGoReqs); err != nil {
return curCtx, false, err
return false, err
}
tid := TaskID(ev.args[0])
nameID := stringID(ev.args[1])
name, ok := evt.strings.get(nameID)
if !ok {
return curCtx, false, fmt.Errorf("invalid string ID %v for %v event", nameID, typ)
return false, fmt.Errorf("invalid string ID %v for %v event", nameID, typ)
}
gState, ok := o.gStates[curCtx.G]
if !ok {
return curCtx, false, fmt.Errorf("encountered EvUserRegionBegin without known state for current goroutine %d", curCtx.G)
return false, fmt.Errorf("encountered EvUserRegionBegin without known state for current goroutine %d", curCtx.G)
}
if err := gState.beginRegion(userRegion{tid, name}); err != nil {
return curCtx, false, err
return false, err
}
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvUserRegionEnd:
if err := validateCtx(curCtx, event.UserGoReqs); err != nil {
return curCtx, false, err
return false, err
}
tid := TaskID(ev.args[0])
nameID := stringID(ev.args[1])
name, ok := evt.strings.get(nameID)
if !ok {
return curCtx, false, fmt.Errorf("invalid string ID %v for %v event", nameID, typ)
return false, fmt.Errorf("invalid string ID %v for %v event", nameID, typ)
}
gState, ok := o.gStates[curCtx.G]
if !ok {
return curCtx, false, fmt.Errorf("encountered EvUserRegionEnd without known state for current goroutine %d", curCtx.G)
return false, fmt.Errorf("encountered EvUserRegionEnd without known state for current goroutine %d", curCtx.G)
}
if err := gState.endRegion(userRegion{tid, name}); err != nil {
return curCtx, false, err
return false, err
}
return curCtx, true, nil
o.queue.push(currentEvent())
// Handle the GC mark phase.
//
@ -694,86 +695,88 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
seq := ev.args[0]
if gen == o.initialGen {
if o.gcState != gcUndetermined {
return curCtx, false, fmt.Errorf("GCActive in the first generation isn't first GC event")
return false, fmt.Errorf("GCActive in the first generation isn't first GC event")
}
o.gcSeq = seq
o.gcState = gcRunning
return curCtx, true, nil
o.queue.push(currentEvent())
break
}
if seq != o.gcSeq+1 {
// This is not the right GC cycle.
return curCtx, false, nil
return false, nil
}
if o.gcState != gcRunning {
return curCtx, false, fmt.Errorf("encountered GCActive while GC was not in progress")
return false, fmt.Errorf("encountered GCActive while GC was not in progress")
}
o.gcSeq = seq
if err := validateCtx(curCtx, event.UserGoReqs); err != nil {
return curCtx, false, err
return false, err
}
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvGCBegin:
seq := ev.args[0]
if o.gcState == gcUndetermined {
o.gcSeq = seq
o.gcState = gcRunning
return curCtx, true, nil
o.queue.push(currentEvent())
break
}
if seq != o.gcSeq+1 {
// This is not the right GC cycle.
return curCtx, false, nil
return false, nil
}
if o.gcState == gcRunning {
return curCtx, false, fmt.Errorf("encountered GCBegin while GC was already in progress")
return false, fmt.Errorf("encountered GCBegin while GC was already in progress")
}
o.gcSeq = seq
o.gcState = gcRunning
if err := validateCtx(curCtx, event.UserGoReqs); err != nil {
return curCtx, false, err
return false, err
}
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvGCEnd:
seq := ev.args[0]
if seq != o.gcSeq+1 {
// This is not the right GC cycle.
return curCtx, false, nil
return false, nil
}
if o.gcState == gcNotRunning {
return curCtx, false, fmt.Errorf("encountered GCEnd when GC was not in progress")
return false, fmt.Errorf("encountered GCEnd when GC was not in progress")
}
if o.gcState == gcUndetermined {
return curCtx, false, fmt.Errorf("encountered GCEnd when GC was in an undetermined state")
return false, fmt.Errorf("encountered GCEnd when GC was in an undetermined state")
}
o.gcSeq = seq
o.gcState = gcNotRunning
if err := validateCtx(curCtx, event.UserGoReqs); err != nil {
return curCtx, false, err
return false, err
}
return curCtx, true, nil
o.queue.push(currentEvent())
// Handle simple instantaneous events that require a G.
case go122.EvGoLabel, go122.EvProcsChange, go122.EvUserLog:
if err := validateCtx(curCtx, event.UserGoReqs); err != nil {
return curCtx, false, err
return false, err
}
return curCtx, true, nil
o.queue.push(currentEvent())
// Handle allocation states, which don't require a G.
case go122.EvHeapAlloc, go122.EvHeapGoal:
if err := validateCtx(curCtx, event.SchedReqs{Thread: event.MustHave, Proc: event.MustHave, Goroutine: event.MayHave}); err != nil {
return curCtx, false, err
return false, err
}
return curCtx, true, nil
o.queue.push(currentEvent())
// Handle sweep, which is bound to a P and doesn't require a G.
case go122.EvGCSweepBegin:
if err := validateCtx(curCtx, event.SchedReqs{Thread: event.MustHave, Proc: event.MustHave, Goroutine: event.MayHave}); err != nil {
return curCtx, false, err
return false, err
}
if err := o.pStates[curCtx.P].beginRange(makeRangeType(typ, 0)); err != nil {
return curCtx, false, err
return false, err
}
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvGCSweepActive:
pid := ProcID(ev.args[0])
// N.B. In practice Ps can't block while they're sweeping, so this can only
@ -782,26 +785,26 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
// in the middle of a sweep.
pState, ok := o.pStates[pid]
if !ok {
return curCtx, false, fmt.Errorf("encountered GCSweepActive for unknown proc %d", pid)
return false, fmt.Errorf("encountered GCSweepActive for unknown proc %d", pid)
}
if err := pState.activeRange(makeRangeType(typ, 0), gen == o.initialGen); err != nil {
return curCtx, false, err
return false, err
}
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvGCSweepEnd:
if err := validateCtx(curCtx, event.SchedReqs{Thread: event.MustHave, Proc: event.MustHave, Goroutine: event.MayHave}); err != nil {
return curCtx, false, err
return false, err
}
_, err := o.pStates[curCtx.P].endRange(typ)
if err != nil {
return curCtx, false, err
return false, err
}
return curCtx, true, nil
o.queue.push(currentEvent())
// Handle special goroutine-bound event ranges.
case go122.EvSTWBegin, go122.EvGCMarkAssistBegin:
if err := validateCtx(curCtx, event.UserGoReqs); err != nil {
return curCtx, false, err
return false, err
}
desc := stringID(0)
if typ == go122.EvSTWBegin {
@ -809,12 +812,12 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
}
gState, ok := o.gStates[curCtx.G]
if !ok {
return curCtx, false, fmt.Errorf("encountered event of type %d without known state for current goroutine %d", typ, curCtx.G)
return false, fmt.Errorf("encountered event of type %d without known state for current goroutine %d", typ, curCtx.G)
}
if err := gState.beginRange(makeRangeType(typ, desc)); err != nil {
return curCtx, false, err
return false, err
}
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvGCMarkAssistActive:
gid := GoID(ev.args[0])
// N.B. Like GoStatus, this can happen at any time, because it can
@ -822,32 +825,44 @@ func (o *ordering) advance(ev *baseEvent, evt *evTable, m ThreadID, gen uint64)
// current scheduler context.
gState, ok := o.gStates[gid]
if !ok {
return curCtx, false, fmt.Errorf("uninitialized goroutine %d found during %s", gid, go122.EventString(typ))
return false, fmt.Errorf("uninitialized goroutine %d found during %s", gid, go122.EventString(typ))
}
if err := gState.activeRange(makeRangeType(typ, 0), gen == o.initialGen); err != nil {
return curCtx, false, err
return false, err
}
return curCtx, true, nil
o.queue.push(currentEvent())
case go122.EvSTWEnd, go122.EvGCMarkAssistEnd:
if err := validateCtx(curCtx, event.UserGoReqs); err != nil {
return curCtx, false, err
return false, err
}
gState, ok := o.gStates[curCtx.G]
if !ok {
return curCtx, false, fmt.Errorf("encountered event of type %d without known state for current goroutine %d", typ, curCtx.G)
return false, fmt.Errorf("encountered event of type %d without known state for current goroutine %d", typ, curCtx.G)
}
desc, err := gState.endRange(typ)
if err != nil {
return curCtx, false, err
return false, err
}
if typ == go122.EvSTWEnd {
// Smuggle the kind into the event.
// Don't use ev.extra here so we have symmetry with STWBegin.
ev.args[0] = uint64(desc)
}
return curCtx, true, nil
o.queue.push(currentEvent())
default:
return false, fmt.Errorf("bad event type found while ordering: %v", ev.typ)
}
return curCtx, false, fmt.Errorf("bad event type found while ordering: %v", ev.typ)
if ms != nil {
// Update the mState for this event.
ms.p = newCtx.P
ms.g = newCtx.G
}
return true, nil
}
// Next returns the next event in the ordering.
func (o *ordering) Next() (Event, bool) {
return o.queue.pop()
}
// schedCtx represents the scheduling resources associated with an event.
@ -1092,3 +1107,51 @@ type taskState struct {
// parentID is the parent ID of the active task.
parentID TaskID
}
// queue implements a growable ring buffer with a queue API.
type queue[T any] struct {
start, end int
buf []T
}
// push adds a new event to the back of the queue.
func (q *queue[T]) push(value T) {
if q.end-q.start == len(q.buf) {
q.grow()
}
q.buf[q.end%len(q.buf)] = value
q.end++
}
// grow increases the size of the queue.
func (q *queue[T]) grow() {
if len(q.buf) == 0 {
q.buf = make([]T, 2)
return
}
// Create new buf and copy data over.
newBuf := make([]T, len(q.buf)*2)
pivot := q.start % len(q.buf)
first, last := q.buf[pivot:], q.buf[:pivot]
copy(newBuf[:len(first)], first)
copy(newBuf[len(first):], last)
// Update the queue state.
q.start = 0
q.end = len(q.buf)
q.buf = newBuf
}
// pop removes an event from the front of the queue. If the
// queue is empty, it returns an EventBad event.
func (q *queue[T]) pop() (T, bool) {
if q.end-q.start == 0 {
return *new(T), false
}
elem := &q.buf[q.start%len(q.buf)]
value := *elem
*elem = *new(T) // Clear the entry before returning, so we don't hold onto old tables.
q.start++
return value, true
}

View File

@ -0,0 +1,34 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trace
import "testing"
func TestQueue(t *testing.T) {
var q queue[int]
check := func(name string, exp []int) {
for _, v := range exp {
q.push(v)
}
for i, want := range exp {
if got, ok := q.pop(); !ok {
t.Fatalf("check %q: expected to be able to pop after %d pops", name, i+1)
} else if got != want {
t.Fatalf("check %q: expected value %d after on pop %d, got %d", name, want, i+1, got)
}
}
if _, ok := q.pop(); ok {
t.Fatalf("check %q: did not expect to be able to pop more values", name)
}
if _, ok := q.pop(); ok {
t.Fatalf("check %q: did not expect to be able to pop more values a second time", name)
}
}
check("one element", []int{4})
check("two elements", []int{64, 12})
check("six elements", []int{55, 16423, 2352, 644, 12874, 9372})
check("one element again", []int{7})
check("two elements again", []int{77, 6336})
}

View File

@ -85,8 +85,8 @@ func (r *Reader) ReadEvent() (e Event, err error) {
r.lastTs = e.base.time
}()
// Consume any extra events produced during parsing.
if ev := r.order.consumeExtraEvent(); ev.Kind() != EventBad {
// Consume any events in the ordering first.
if ev, ok := r.order.Next(); ok {
return ev, nil
}
@ -130,13 +130,17 @@ func (r *Reader) ReadEvent() (e Event, err error) {
// Reset emittedSync.
r.emittedSync = false
}
refresh := func(i int) error {
tryAdvance := func(i int) (bool, error) {
bc := r.frontier[i]
if ok, err := r.order.Advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); !ok || err != nil {
return ok, err
}
// Refresh the cursor's event.
ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
if err != nil {
return err
return false, err
}
if ok {
// If we successfully refreshed, update the heap.
@ -145,7 +149,7 @@ func (r *Reader) ReadEvent() (e Event, err error) {
// There's nothing else to read. Delete this cursor from the frontier.
r.frontier = heapRemove(r.frontier, i)
}
return nil
return true, nil
}
// Inject a CPU sample if it comes next.
if len(r.cpuSamples) != 0 {
@ -160,28 +164,35 @@ func (r *Reader) ReadEvent() (e Event, err error) {
if len(r.frontier) == 0 {
return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
}
bc := r.frontier[0]
if ctx, ok, err := r.order.advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); err != nil {
if ok, err := tryAdvance(0); err != nil {
return Event{}, err
} else if ok {
e := Event{table: r.gen.evTable, ctx: ctx, base: bc.ev}
return e, refresh(0)
}
// Sort the min-heap. A sorted min-heap is still a min-heap,
// but now we can iterate over the rest and try to advance in
// order. This path should be rare.
slices.SortFunc(r.frontier, (*batchCursor).compare)
// Try to advance the rest of the frontier, in timestamp order.
for i := 1; i < len(r.frontier); i++ {
bc := r.frontier[i]
if ctx, ok, err := r.order.advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); err != nil {
return Event{}, err
} else if ok {
e := Event{table: r.gen.evTable, ctx: ctx, base: bc.ev}
return e, refresh(i)
} else if !ok {
// Try to advance the rest of the frontier, in timestamp order.
//
// To do this, sort the min-heap. A sorted min-heap is still a
// min-heap, but now we can iterate over the rest and try to
// advance in order. This path should be rare.
slices.SortFunc(r.frontier, (*batchCursor).compare)
success := false
for i := 1; i < len(r.frontier); i++ {
if ok, err = tryAdvance(i); err != nil {
return Event{}, err
} else if ok {
success = true
break
}
}
if !success {
return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
}
}
return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
// Pick off the next event on the queue. At this point, one must exist.
ev, ok := r.order.Next()
if !ok {
panic("invariant violation: advance successful, but queue is empty")
}
return ev, nil
}
func dumpFrontier(frontier []*batchCursor) string {