mirror of
https://github.com/golang/go
synced 2024-11-18 16:14:46 -07:00
internal/telemetry: allow ProcessEvent to modify the event
This allows early exporters to adjust the event for later ones. This is used to lookup key values from the context if needed. Also add a Query type event which is intended to perform all event modifications but nothing else, and is used to lookup values from the context. This cleans up a weirdness where the current lookup presumes there will be an exporter with a matching mechanism. Change-Id: I835d1e0b2511553c30f94b7becfe7b7b5462c111 Reviewed-on: https://go-review.googlesource.com/c/tools/+/223657 Run-TryBot: Ian Cottrell <iancottrell@google.com> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: Emmanuel Odeke <emm.odeke@gmail.com>
This commit is contained in:
parent
9fceb114c3
commit
cb106d260e
@ -542,24 +542,24 @@ func (i *Instance) writeMemoryDebug(threshold uint64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *exporter) ProcessEvent(ctx context.Context, ev event.Event) context.Context {
|
||||
ctx = export.ContextSpan(ctx, ev)
|
||||
func (e *exporter) ProcessEvent(ctx context.Context, ev event.Event) (context.Context, event.Event) {
|
||||
ctx, ev = export.ContextSpan(ctx, ev)
|
||||
i := GetInstance(ctx)
|
||||
if ev.IsLog() && (ev.Error != nil || i == nil) {
|
||||
fmt.Fprintf(e.stderr, "%v\n", ev)
|
||||
}
|
||||
ctx = protocol.LogEvent(ctx, ev)
|
||||
ctx, ev = protocol.LogEvent(ctx, ev)
|
||||
if i == nil {
|
||||
return ctx
|
||||
return ctx, ev
|
||||
}
|
||||
ctx = export.Tag(ctx, ev)
|
||||
ctx, ev = export.Tag(ctx, ev)
|
||||
if i.ocagent != nil {
|
||||
ctx = i.ocagent.ProcessEvent(ctx, ev)
|
||||
ctx, ev = i.ocagent.ProcessEvent(ctx, ev)
|
||||
}
|
||||
if i.traces != nil {
|
||||
ctx = i.traces.ProcessEvent(ctx, ev)
|
||||
ctx, ev = i.traces.ProcessEvent(ctx, ev)
|
||||
}
|
||||
return ctx
|
||||
return ctx, ev
|
||||
}
|
||||
|
||||
func (e *exporter) Metric(ctx context.Context, data event.MetricData) {
|
||||
|
@ -73,12 +73,12 @@ type traceEvent struct {
|
||||
Tags string
|
||||
}
|
||||
|
||||
func (t *traces) ProcessEvent(ctx context.Context, ev event.Event) context.Context {
|
||||
func (t *traces) ProcessEvent(ctx context.Context, ev event.Event) (context.Context, event.Event) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
span := export.GetSpan(ctx)
|
||||
if span == nil {
|
||||
return ctx
|
||||
return ctx, ev
|
||||
}
|
||||
|
||||
switch {
|
||||
@ -99,13 +99,13 @@ func (t *traces) ProcessEvent(ctx context.Context, ev event.Event) context.Conte
|
||||
t.unfinished[span.ID] = td
|
||||
// and wire up parents if we have them
|
||||
if !span.ParentID.IsValid() {
|
||||
return ctx
|
||||
return ctx, ev
|
||||
}
|
||||
parentID := export.SpanContext{TraceID: span.ID.TraceID, SpanID: span.ParentID}
|
||||
parent, found := t.unfinished[parentID]
|
||||
if !found {
|
||||
// trace had an invalid parent, so it cannot itself be valid
|
||||
return ctx
|
||||
return ctx, ev
|
||||
}
|
||||
parent.Children = append(parent.Children, td)
|
||||
|
||||
@ -113,7 +113,7 @@ func (t *traces) ProcessEvent(ctx context.Context, ev event.Event) context.Conte
|
||||
// finishing, must be already in the map
|
||||
td, found := t.unfinished[span.ID]
|
||||
if !found {
|
||||
return ctx // if this happens we are in a bad place
|
||||
return ctx, ev // if this happens we are in a bad place
|
||||
}
|
||||
delete(t.unfinished, span.ID)
|
||||
|
||||
@ -140,7 +140,7 @@ func (t *traces) ProcessEvent(ctx context.Context, ev event.Event) context.Conte
|
||||
fillOffsets(td, td.Start)
|
||||
}
|
||||
}
|
||||
return ctx
|
||||
return ctx, ev
|
||||
}
|
||||
|
||||
func (t *traces) getData(req *http.Request) interface{} {
|
||||
|
@ -201,7 +201,7 @@ func (s *Server) publishReports(ctx context.Context, snapshot source.Snapshot, r
|
||||
Version: key.id.Version,
|
||||
}); err != nil {
|
||||
if ctx.Err() == nil {
|
||||
event.Error(ctx, "publishReports: failed to deliver diagnostic", err, telemetry.File.From(ctx))
|
||||
event.Error(ctx, "publishReports: failed to deliver diagnostic", err, telemetry.URI.Of(key.id.URI))
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
@ -18,18 +18,18 @@ func WithClient(ctx context.Context, client Client) context.Context {
|
||||
return context.WithValue(ctx, clientKey, client)
|
||||
}
|
||||
|
||||
func LogEvent(ctx context.Context, ev event.Event) context.Context {
|
||||
func LogEvent(ctx context.Context, ev event.Event) (context.Context, event.Event) {
|
||||
if !ev.IsLog() {
|
||||
return ctx
|
||||
return ctx, ev
|
||||
}
|
||||
client, ok := ctx.Value(clientKey).(Client)
|
||||
if !ok {
|
||||
return ctx
|
||||
return ctx, ev
|
||||
}
|
||||
msg := &LogMessageParams{Type: Info, Message: fmt.Sprint(ev)}
|
||||
if ev.Error != nil {
|
||||
msg.Type = Error
|
||||
}
|
||||
go client.LogMessage(xcontext.Detach(ctx), msg)
|
||||
return ctx
|
||||
return ctx, ev
|
||||
}
|
||||
|
@ -154,8 +154,8 @@ func newExporter() *loggingExporter {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *loggingExporter) ProcessEvent(ctx context.Context, ev event.Event) context.Context {
|
||||
export.ContextSpan(ctx, ev)
|
||||
func (e *loggingExporter) ProcessEvent(ctx context.Context, ev event.Event) (context.Context, event.Event) {
|
||||
ctx, ev = export.ContextSpan(ctx, ev)
|
||||
return e.logger.ProcessEvent(ctx, ev)
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,7 @@ const (
|
||||
StartSpanType
|
||||
EndSpanType
|
||||
LabelType
|
||||
QueryType
|
||||
DetachType
|
||||
)
|
||||
|
||||
@ -31,7 +32,8 @@ type Event struct {
|
||||
func (e Event) IsLog() bool { return e.Type == LogType }
|
||||
func (e Event) IsEndSpan() bool { return e.Type == EndSpanType }
|
||||
func (e Event) IsStartSpan() bool { return e.Type == StartSpanType }
|
||||
func (e Event) IsTag() bool { return e.Type == LabelType }
|
||||
func (e Event) IsLabel() bool { return e.Type == LabelType }
|
||||
func (e Event) IsQuery() bool { return e.Type == QueryType }
|
||||
func (e Event) IsDetach() bool { return e.Type == DetachType }
|
||||
|
||||
func (e Event) Format(f fmt.State, r rune) {
|
||||
|
@ -16,7 +16,7 @@ type Exporter interface {
|
||||
// along with the context in which that event ocurred.
|
||||
// This method is called synchronously from the event call site, so it should
|
||||
// return quickly so as not to hold up user code.
|
||||
ProcessEvent(context.Context, Event) context.Context
|
||||
ProcessEvent(context.Context, Event) (context.Context, Event)
|
||||
|
||||
Metric(context.Context, MetricData)
|
||||
}
|
||||
@ -37,13 +37,13 @@ func SetExporter(e Exporter) {
|
||||
atomic.StorePointer(&exporter, p)
|
||||
}
|
||||
|
||||
func ProcessEvent(ctx context.Context, event Event) context.Context {
|
||||
func ProcessEvent(ctx context.Context, ev Event) (context.Context, Event) {
|
||||
exporterPtr := (*Exporter)(atomic.LoadPointer(&exporter))
|
||||
if exporterPtr == nil {
|
||||
return ctx
|
||||
return ctx, ev
|
||||
}
|
||||
// and now also hand the event of to the current exporter
|
||||
return (*exporterPtr).ProcessEvent(ctx, event)
|
||||
return (*exporterPtr).ProcessEvent(ctx, ev)
|
||||
}
|
||||
|
||||
func Metric(ctx context.Context, data MetricData) {
|
||||
|
@ -4,10 +4,6 @@
|
||||
|
||||
package event
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// Key is used as the identity of a Tag.
|
||||
// Keys are intended to be compared by pointer only, the name should be unique
|
||||
// for communicating with external systems, but it is not required or enforced.
|
||||
@ -27,13 +23,3 @@ func TagOf(name string, value interface{}) Tag {
|
||||
func (k *Key) Of(v interface{}) Tag {
|
||||
return Tag{Key: k, Value: v}
|
||||
}
|
||||
|
||||
// From can be used to get a tag for the key from a context.
|
||||
func (k *Key) From(ctx context.Context) Tag {
|
||||
return Tag{Key: k, Value: ctx.Value(k)}
|
||||
}
|
||||
|
||||
// With is a wrapper over the Label package level function for just this key.
|
||||
func (k *Key) With(ctx context.Context, v interface{}) context.Context {
|
||||
return Label(ctx, Tag{Key: k, Value: v})
|
||||
}
|
||||
|
@ -11,9 +11,24 @@ import (
|
||||
|
||||
// Label sends a label event to the exporter with the supplied tags.
|
||||
func Label(ctx context.Context, tags ...Tag) context.Context {
|
||||
return ProcessEvent(ctx, Event{
|
||||
ctx, _ = ProcessEvent(ctx, Event{
|
||||
Type: LabelType,
|
||||
At: time.Now(),
|
||||
Tags: tags,
|
||||
})
|
||||
return ctx
|
||||
}
|
||||
|
||||
// Query sends a query event to the exporter with the supplied keys.
|
||||
// The returned tags will have up to date values if the exporter supports it.
|
||||
func Query(ctx context.Context, keys ...*Key) TagList {
|
||||
tags := make(TagList, len(keys))
|
||||
for i, k := range keys {
|
||||
tags[i].Key = k
|
||||
}
|
||||
_, ev := ProcessEvent(ctx, Event{
|
||||
Type: QueryType,
|
||||
Tags: tags,
|
||||
})
|
||||
return ev.Tags
|
||||
}
|
||||
|
@ -5,7 +5,6 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
@ -26,15 +25,6 @@ func (t Tag) Format(f fmt.State, r rune) {
|
||||
fmt.Fprintf(f, `%v="%v"`, t.Key.Name, t.Value)
|
||||
}
|
||||
|
||||
// Tags collects a set of values from the context and returns them as a tag list.
|
||||
func Tags(ctx context.Context, keys ...*Key) TagList {
|
||||
tags := make(TagList, len(keys))
|
||||
for i, key := range keys {
|
||||
tags[i] = Tag{Key: key, Value: ctx.Value(key)}
|
||||
}
|
||||
return tags
|
||||
}
|
||||
|
||||
// Get will get a single key's value from the list.
|
||||
func (l TagList) Get(k interface{}) interface{} {
|
||||
for _, t := range l {
|
||||
|
@ -10,7 +10,7 @@ import (
|
||||
)
|
||||
|
||||
func StartSpan(ctx context.Context, name string, tags ...Tag) (context.Context, func()) {
|
||||
ctx = ProcessEvent(ctx, Event{
|
||||
ctx, _ = ProcessEvent(ctx, Event{
|
||||
Type: StartSpanType,
|
||||
Message: name,
|
||||
At: time.Now(),
|
||||
@ -27,8 +27,9 @@ func StartSpan(ctx context.Context, name string, tags ...Tag) (context.Context,
|
||||
// Detach returns a context without an associated span.
|
||||
// This allows the creation of spans that are not children of the current span.
|
||||
func Detach(ctx context.Context) context.Context {
|
||||
return ProcessEvent(ctx, Event{
|
||||
ctx, _ = ProcessEvent(ctx, Event{
|
||||
Type: DetachType,
|
||||
At: time.Now(),
|
||||
})
|
||||
return ctx
|
||||
}
|
||||
|
@ -30,11 +30,11 @@ type logWriter struct {
|
||||
onlyErrors bool
|
||||
}
|
||||
|
||||
func (w *logWriter) ProcessEvent(ctx context.Context, ev event.Event) context.Context {
|
||||
func (w *logWriter) ProcessEvent(ctx context.Context, ev event.Event) (context.Context, event.Event) {
|
||||
switch {
|
||||
case ev.IsLog():
|
||||
if w.onlyErrors && ev.Error == nil {
|
||||
return ctx
|
||||
return ctx, ev
|
||||
}
|
||||
fmt.Fprintf(w.writer, "%v\n", ev)
|
||||
case ev.IsStartSpan():
|
||||
@ -49,7 +49,7 @@ func (w *logWriter) ProcessEvent(ctx context.Context, ev event.Event) context.Co
|
||||
fmt.Fprintf(w.writer, "finish: %v %v", span.Name, span.ID)
|
||||
}
|
||||
}
|
||||
return ctx
|
||||
return ctx, ev
|
||||
}
|
||||
|
||||
func (w *logWriter) Metric(context.Context, event.MetricData) {}
|
||||
|
@ -84,9 +84,9 @@ func Connect(config *Config) *Exporter {
|
||||
return exporter
|
||||
}
|
||||
|
||||
func (e *Exporter) ProcessEvent(ctx context.Context, ev event.Event) context.Context {
|
||||
func (e *Exporter) ProcessEvent(ctx context.Context, ev event.Event) (context.Context, event.Event) {
|
||||
if !ev.IsEndSpan() {
|
||||
return ctx
|
||||
return ctx, ev
|
||||
}
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
@ -94,7 +94,7 @@ func (e *Exporter) ProcessEvent(ctx context.Context, ev event.Event) context.Con
|
||||
if span != nil {
|
||||
e.spans = append(e.spans, span)
|
||||
}
|
||||
return ctx
|
||||
return ctx, ev
|
||||
}
|
||||
|
||||
func (e *Exporter) Metric(ctx context.Context, data event.MetricData) {
|
||||
|
@ -179,7 +179,7 @@ func TestTrace(t *testing.T) {
|
||||
Type: event.EndSpanType,
|
||||
At: end,
|
||||
}
|
||||
ctx := export.ContextSpan(ctx, startEvent)
|
||||
ctx, _ := export.ContextSpan(ctx, startEvent)
|
||||
span := export.GetSpan(ctx)
|
||||
span.ID = export.SpanContext{}
|
||||
span.Events = []event.Event{tt.event(ctx)}
|
||||
|
@ -10,15 +10,26 @@ import (
|
||||
"golang.org/x/tools/internal/telemetry/event"
|
||||
)
|
||||
|
||||
// Tag returns a context updated with tag values from the event.
|
||||
// It ignores events that are not or type IsTag or IsStartSpan.
|
||||
func Tag(ctx context.Context, ev event.Event) context.Context {
|
||||
// Tag manipulates the context using the event.
|
||||
// If the event is type IsTag or IsStartSpan then it returns a context updated
|
||||
// with tag values from the event.
|
||||
// For all other event types the event tags will be updated with values from the
|
||||
// context if they are missing.
|
||||
func Tag(ctx context.Context, ev event.Event) (context.Context, event.Event) {
|
||||
//TODO: Do we need to do something more efficient than just store tags
|
||||
//TODO: directly on the context?
|
||||
if ev.IsTag() || ev.IsStartSpan() {
|
||||
switch {
|
||||
case ev.IsLabel(), ev.IsStartSpan():
|
||||
for _, t := range ev.Tags {
|
||||
ctx = context.WithValue(ctx, t.Key, t.Value)
|
||||
}
|
||||
default:
|
||||
// all other types want the tags filled in if needed
|
||||
for i := range ev.Tags {
|
||||
if ev.Tags[i].Value == nil {
|
||||
ev.Tags[i].Value = ctx.Value(ev.Tags[i].Key)
|
||||
}
|
||||
}
|
||||
}
|
||||
return ctx
|
||||
return ctx, ev
|
||||
}
|
||||
|
@ -46,9 +46,9 @@ func GetSpan(ctx context.Context) *Span {
|
||||
// It creates new spans on EventStartSpan, adds events to the current span on
|
||||
// EventLog or EventTag, and closes the span on EventEndSpan.
|
||||
// The span structure can then be used by other exporters.
|
||||
func ContextSpan(ctx context.Context, ev event.Event) context.Context {
|
||||
func ContextSpan(ctx context.Context, ev event.Event) (context.Context, event.Event) {
|
||||
switch {
|
||||
case ev.IsLog(), ev.IsTag():
|
||||
case ev.IsLog(), ev.IsLabel():
|
||||
if span := GetSpan(ctx); span != nil {
|
||||
span.Events = append(span.Events, ev)
|
||||
}
|
||||
@ -71,9 +71,9 @@ func ContextSpan(ctx context.Context, ev event.Event) context.Context {
|
||||
span.Finish = ev.At
|
||||
}
|
||||
case ev.IsDetach():
|
||||
return context.WithValue(ctx, spanContextKey, nil)
|
||||
return context.WithValue(ctx, spanContextKey, nil), ev
|
||||
}
|
||||
return ctx
|
||||
return ctx, ev
|
||||
}
|
||||
|
||||
func (s *SpanContext) Format(f fmt.State, r rune) {
|
||||
|
@ -202,7 +202,7 @@ type HistogramFloat64Row struct {
|
||||
}
|
||||
|
||||
func getGroup(ctx context.Context, g *[]event.TagList, keys []*event.Key) (int, bool) {
|
||||
group := event.Tags(ctx, keys...)
|
||||
group := event.Query(ctx, keys...)
|
||||
old := *g
|
||||
index := sort.Search(len(old), func(i int) bool {
|
||||
return !old[i].Less(group)
|
||||
|
Loading…
Reference in New Issue
Block a user