1
0
mirror of https://github.com/golang/go synced 2024-09-30 14:28:33 -06:00

internal/telemetry: clean up the exporter api

this shuffles things so there a single exporter API rather than an observer
It also removes most of the globals.
per telemetry type.

Change-Id: Iaa82abe2ded1fff9df8e067ed4a55bcbd9d9591f
Reviewed-on: https://go-review.googlesource.com/c/tools/+/190405
Run-TryBot: Ian Cottrell <iancottrell@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Emmanuel Odeke <emm.odeke@gmail.com>
This commit is contained in:
Ian Cottrell 2019-08-14 12:51:42 -04:00 committed by Ian Cottrell
parent 2562441715
commit 5b08f89bfc
34 changed files with 754 additions and 674 deletions

View File

@ -6,4 +6,5 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190723021737-8bb11ff117ca h1:SqwJrz6xPBlCUltcEHz2/p01HRPR+VGD+aYLikk8uas=
golang.org/x/tools v0.0.0-20190723021737-8bb11ff117ca/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@ -25,7 +25,8 @@ import (
"golang.org/x/tools/internal/lsp/protocol"
"golang.org/x/tools/internal/lsp/source"
"golang.org/x/tools/internal/span"
"golang.org/x/tools/internal/telemetry/ocagent"
"golang.org/x/tools/internal/telemetry/export"
"golang.org/x/tools/internal/telemetry/export/ocagent"
"golang.org/x/tools/internal/tool"
"golang.org/x/tools/internal/xcontext"
errors "golang.org/x/xerrors"
@ -112,7 +113,7 @@ gopls flags are:
// If no arguments are passed it will invoke the server sub command, as a
// temporary measure for compatibility.
func (app *Application) Run(ctx context.Context, args ...string) error {
ocagent.Export(app.name, app.OCAgent)
export.AddExporters(ocagent.Connect(app.name, app.OCAgent))
app.Serve.app = app
if len(args) == 0 {
tool.Main(ctx, &app.Serve, args)

View File

@ -21,7 +21,6 @@ import (
"golang.org/x/tools/internal/lsp"
"golang.org/x/tools/internal/lsp/debug"
"golang.org/x/tools/internal/lsp/telemetry"
"golang.org/x/tools/internal/telemetry/tag"
"golang.org/x/tools/internal/telemetry/trace"
"golang.org/x/tools/internal/tool"
errors "golang.org/x/xerrors"
@ -166,9 +165,9 @@ func (h *handler) Request(ctx context.Context, direction jsonrpc2.Direction, r *
mode = telemetry.Inbound
}
ctx, stats.close = trace.StartSpan(ctx, r.Method,
tag.Tag{Key: telemetry.Method, Value: r.Method},
tag.Tag{Key: telemetry.RPCDirection, Value: mode},
tag.Tag{Key: telemetry.RPCID, Value: r.ID},
telemetry.Method.Of(r.Method),
telemetry.RPCDirection.Of(mode),
telemetry.RPCID.Of(r.ID),
)
telemetry.Started.Record(ctx, 1)
_, stats.delivering = trace.StartSpan(ctx, "queued")

View File

@ -1,111 +0,0 @@
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package debug
import (
"bytes"
"fmt"
"net/http"
"sort"
"golang.org/x/tools/internal/telemetry/metric"
"golang.org/x/tools/internal/telemetry/tag"
"golang.org/x/tools/internal/telemetry/worker"
)
type prometheus struct {
metrics []metric.Data
}
func (p *prometheus) observeMetric(data metric.Data) {
name := data.Handle().Name()
index := sort.Search(len(p.metrics), func(i int) bool {
return p.metrics[i].Handle().Name() >= name
})
if index >= len(p.metrics) || p.metrics[index].Handle().Name() != name {
old := p.metrics
p.metrics = make([]metric.Data, len(old)+1)
copy(p.metrics, old[:index])
copy(p.metrics[index+1:], old[index:])
}
p.metrics[index] = data
}
func (p *prometheus) header(w http.ResponseWriter, name, description string, isGauge, isHistogram bool) {
kind := "counter"
if isGauge {
kind = "gauge"
}
if isHistogram {
kind = "histogram"
}
fmt.Fprintf(w, "# HELP %s %s\n", name, description)
fmt.Fprintf(w, "# TYPE %s %s\n", name, kind)
}
func (p *prometheus) row(w http.ResponseWriter, name string, group tag.List, extra string, value interface{}) {
fmt.Fprint(w, name)
buf := &bytes.Buffer{}
fmt.Fprint(buf, group)
if extra != "" {
if buf.Len() > 0 {
fmt.Fprint(buf, ",")
}
fmt.Fprint(buf, extra)
}
if buf.Len() > 0 {
fmt.Fprint(w, "{")
buf.WriteTo(w)
fmt.Fprint(w, "}")
}
fmt.Fprintf(w, " %v\n", value)
}
func (p *prometheus) serve(w http.ResponseWriter, r *http.Request) {
done := make(chan struct{})
worker.Do(func() {
defer close(done)
for _, data := range p.metrics {
switch data := data.(type) {
case *metric.Int64Data:
p.header(w, data.Info.Name, data.Info.Description, data.IsGauge, false)
for i, group := range data.Groups() {
p.row(w, data.Info.Name, group, "", data.Rows[i])
}
case *metric.Float64Data:
p.header(w, data.Info.Name, data.Info.Description, data.IsGauge, false)
for i, group := range data.Groups() {
p.row(w, data.Info.Name, group, "", data.Rows[i])
}
case *metric.HistogramInt64Data:
p.header(w, data.Info.Name, data.Info.Description, false, true)
for i, group := range data.Groups() {
row := data.Rows[i]
for j, b := range data.Info.Buckets {
p.row(w, data.Info.Name+"_bucket", group, fmt.Sprintf(`le="%v"`, b), row.Values[j])
}
p.row(w, data.Info.Name+"_bucket", group, `le="+Inf"`, row.Count)
p.row(w, data.Info.Name+"_count", group, "", row.Count)
p.row(w, data.Info.Name+"_sum", group, "", row.Sum)
}
case *metric.HistogramFloat64Data:
p.header(w, data.Info.Name, data.Info.Description, false, true)
for i, group := range data.Groups() {
row := data.Rows[i]
for j, b := range data.Info.Buckets {
p.row(w, data.Info.Name+"_bucket", group, fmt.Sprintf(`le="%v"`, b), row.Values[j])
}
p.row(w, data.Info.Name+"_bucket", group, `le="+Inf"`, row.Count)
p.row(w, data.Info.Name+"_count", group, "", row.Count)
p.row(w, data.Info.Name+"_sum", group, "", row.Sum)
}
}
}
})
<-done
}

View File

@ -5,13 +5,15 @@
package debug
import (
"context"
"fmt"
"html/template"
"log"
"net/http"
"sort"
"golang.org/x/tools/internal/lsp/telemetry"
tlm "golang.org/x/tools/internal/lsp/telemetry"
"golang.org/x/tools/internal/telemetry"
"golang.org/x/tools/internal/telemetry/metric"
)
@ -88,13 +90,16 @@ type rpcCodeBucket struct {
Count int64
}
func (r *rpcs) observeMetric(data metric.Data) {
func (r *rpcs) StartSpan(ctx context.Context, span *telemetry.Span) {}
func (r *rpcs) FinishSpan(ctx context.Context, span *telemetry.Span) {}
func (r *rpcs) Log(ctx context.Context, event telemetry.Event) {}
func (r *rpcs) Metric(ctx context.Context, data telemetry.MetricData) {
for i, group := range data.Groups() {
set := &r.Inbound
if group.Get(telemetry.RPCDirection) == telemetry.Outbound {
if group.Get(tlm.RPCDirection) == tlm.Outbound {
set = &r.Outbound
}
method, ok := group.Get(telemetry.Method).(string)
method, ok := group.Get(tlm.Method).(string)
if !ok {
log.Printf("Not a method... %v", group)
continue
@ -114,7 +119,7 @@ func (r *rpcs) observeMetric(data metric.Data) {
case started:
stats.Started = data.(*metric.Int64Data).Rows[i]
case completed:
status, ok := group.Get(telemetry.StatusCode).(string)
status, ok := group.Get(tlm.StatusCode).(string)
if !ok {
log.Printf("Not status... %v", group)
continue

View File

@ -19,11 +19,10 @@ import (
"sync"
"golang.org/x/tools/internal/span"
"golang.org/x/tools/internal/telemetry/export"
"golang.org/x/tools/internal/telemetry/export/prometheus"
"golang.org/x/tools/internal/telemetry/log"
"golang.org/x/tools/internal/telemetry/metric"
"golang.org/x/tools/internal/telemetry/tag"
"golang.org/x/tools/internal/telemetry/trace"
"golang.org/x/tools/internal/telemetry/worker"
)
type Cache interface {
@ -216,12 +215,10 @@ func Serve(ctx context.Context, addr string) error {
return err
}
log.Print(ctx, "Debug serving", tag.Of("Port", listener.Addr().(*net.TCPAddr).Port))
prometheus := prometheus{}
metric.RegisterObservers(prometheus.observeMetric)
rpcs := rpcs{}
metric.RegisterObservers(rpcs.observeMetric)
traces := traces{}
trace.RegisterObservers(traces.export)
prometheus := prometheus.New()
rpcs := &rpcs{}
traces := &traces{}
export.AddExporters(prometheus, rpcs, traces)
go func() {
mux := http.NewServeMux()
mux.HandleFunc("/", Render(mainTmpl, func(*http.Request) interface{} { return data }))
@ -231,7 +228,7 @@ func Serve(ctx context.Context, addr string) error {
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
mux.HandleFunc("/metrics/", prometheus.serve)
mux.HandleFunc("/metrics/", prometheus.Serve)
mux.HandleFunc("/rpc/", Render(rpcTmpl, rpcs.getData))
mux.HandleFunc("/trace/", Render(traceTmpl, traces.getData))
mux.HandleFunc("/cache/", Render(cacheTmpl, getCache))
@ -252,7 +249,7 @@ func Serve(ctx context.Context, addr string) error {
func Render(tmpl *template.Template, fun func(*http.Request) interface{}) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
done := make(chan struct{})
worker.Do(func() {
export.Do(func() {
defer close(done)
var data interface{}
if fun != nil {

View File

@ -6,6 +6,7 @@ package debug
import (
"bytes"
"context"
"fmt"
"html/template"
"net/http"
@ -13,8 +14,7 @@ import (
"strings"
"time"
"golang.org/x/tools/internal/telemetry/tag"
"golang.org/x/tools/internal/telemetry/trace"
"golang.org/x/tools/internal/telemetry"
)
var traceTmpl = template.Must(template.Must(BaseTemplate.Clone()).Parse(`
@ -36,7 +36,7 @@ var traceTmpl = template.Must(template.Must(BaseTemplate.Clone()).Parse(`
type traces struct {
sets map[string]*traceSet
unfinished map[trace.SpanID]*traceData
unfinished map[telemetry.SpanContext]*traceData
}
type traceResults struct {
@ -51,8 +51,9 @@ type traceSet struct {
}
type traceData struct {
ID trace.SpanID
ParentID trace.SpanID
TraceID telemetry.TraceID
SpanID telemetry.SpanID
ParentID telemetry.SpanID
Name string
Start time.Time
Finish time.Time
@ -69,56 +70,48 @@ type traceEvent struct {
Tags string
}
func (t *traces) export(span *trace.Span) {
func (t *traces) StartSpan(ctx context.Context, span *telemetry.Span) {
if t.sets == nil {
t.sets = make(map[string]*traceSet)
t.unfinished = make(map[trace.SpanID]*traceData)
t.unfinished = make(map[telemetry.SpanContext]*traceData)
}
// is this a completed span?
if span.Finish.IsZero() {
t.start(span)
} else {
t.finish(span)
}
}
func (t *traces) start(span *trace.Span) {
// just starting, add it to the unfinished map
td := &traceData{
ID: span.SpanID,
TraceID: span.ID.TraceID,
SpanID: span.ID.SpanID,
ParentID: span.ParentID,
Name: span.Name,
Start: span.Start,
Tags: renderTags(span.Tags),
}
t.unfinished[span.SpanID] = td
t.unfinished[span.ID] = td
// and wire up parents if we have them
if !span.ParentID.IsValid() {
return
}
parent, found := t.unfinished[span.ParentID]
parentID := telemetry.SpanContext{TraceID: span.ID.TraceID, SpanID: span.ParentID}
parent, found := t.unfinished[parentID]
if !found {
// trace had an invalid parent, so it cannot itself be valid
return
}
parent.Children = append(parent.Children, td)
}
func (t *traces) finish(span *trace.Span) {
func (t *traces) FinishSpan(ctx context.Context, span *telemetry.Span) {
// finishing, must be already in the map
td, found := t.unfinished[span.SpanID]
td, found := t.unfinished[span.ID]
if !found {
return // if this happens we are in a bad place
}
delete(t.unfinished, span.SpanID)
delete(t.unfinished, span.ID)
td.Finish = span.Finish
td.Duration = span.Finish.Sub(span.Start)
td.Events = make([]traceEvent, len(span.Events))
for i, event := range span.Events {
td.Events[i] = traceEvent{
Time: event.Time,
Time: event.At,
Tags: renderTags(event.Tags),
}
}
@ -137,6 +130,10 @@ func (t *traces) finish(span *trace.Span) {
}
}
func (t *traces) Log(ctx context.Context, event telemetry.Event) {}
func (t *traces) Metric(ctx context.Context, data telemetry.MetricData) {}
func (t *traces) getData(req *http.Request) interface{} {
if len(t.sets) == 0 {
return nil
@ -163,7 +160,7 @@ func fillOffsets(td *traceData, start time.Time) {
}
}
func renderTags(tags tag.List) string {
func renderTags(tags telemetry.TagList) string {
buf := &bytes.Buffer{}
for _, tag := range tags {
fmt.Fprintf(buf, "%v=%q ", tag.Key, tag.Value)

View File

@ -22,7 +22,6 @@ import (
"golang.org/x/tools/internal/lsp/source"
"golang.org/x/tools/internal/lsp/tests"
"golang.org/x/tools/internal/span"
"golang.org/x/tools/internal/telemetry/log"
)
func TestLSP(t *testing.T) {
@ -42,8 +41,6 @@ func testLSP(t *testing.T, exporter packagestest.Exporter) {
data := tests.Load(t, exporter, "testdata")
defer data.Exported.Cleanup()
log.AddLogger(log.NullLogger)
cache := cache.New()
session := cache.NewSession(ctx)
view := session.NewView(ctx, viewName, span.FileURI(data.Config.Dir))

View File

@ -3,15 +3,14 @@ package protocol
import (
"context"
"fmt"
"time"
"golang.org/x/tools/internal/telemetry/log"
"golang.org/x/tools/internal/telemetry/tag"
"golang.org/x/tools/internal/telemetry"
"golang.org/x/tools/internal/telemetry/export"
"golang.org/x/tools/internal/xcontext"
)
func init() {
log.AddLogger(logger)
export.AddExporters(logExporter{})
}
type contextKey int
@ -24,17 +23,21 @@ func WithClient(ctx context.Context, client Client) context.Context {
return context.WithValue(ctx, clientKey, client)
}
// logger implements log.Logger in terms of the LogMessage call to a client.
func logger(ctx context.Context, at time.Time, tags tag.List) bool {
// logExporter sends the log event back to the client if there is one stored on the
// context.
type logExporter struct{}
func (logExporter) StartSpan(context.Context, *telemetry.Span) {}
func (logExporter) FinishSpan(context.Context, *telemetry.Span) {}
func (logExporter) Log(ctx context.Context, event telemetry.Event) {
client, ok := ctx.Value(clientKey).(Client)
if !ok {
return false
return
}
entry := log.ToEntry(ctx, time.Time{}, tags)
msg := &LogMessageParams{Type: Info, Message: fmt.Sprint(entry)}
if entry.Error != nil {
msg := &LogMessageParams{Type: Info, Message: fmt.Sprint(event)}
if event.Error != nil {
msg.Type = Error
}
go client.LogMessage(xcontext.Detach(ctx), msg)
return true
}
func (logExporter) Metric(context.Context, telemetry.MetricData) {}

View File

@ -21,7 +21,6 @@ import (
"golang.org/x/tools/internal/lsp/source"
"golang.org/x/tools/internal/lsp/tests"
"golang.org/x/tools/internal/span"
"golang.org/x/tools/internal/telemetry/log"
)
func TestSource(t *testing.T) {
@ -39,8 +38,6 @@ func testSource(t *testing.T, exporter packagestest.Exporter) {
data := tests.Load(t, exporter, "../testdata")
defer data.Exported.Cleanup()
log.AddLogger(log.NullLogger)
cache := cache.New()
session := cache.NewSession(ctx)
r := &runner{

View File

@ -0,0 +1,25 @@
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package telemetry
import "context"
type contextKeyType int
const (
spanContextKey = contextKeyType(iota)
)
func WithSpan(ctx context.Context, span *Span) context.Context {
return context.WithValue(ctx, spanContextKey, span)
}
func GetSpan(ctx context.Context) *Span {
v := ctx.Value(spanContextKey)
if v == nil {
return nil
}
return v.(*Span)
}

View File

@ -0,0 +1,30 @@
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package telemetry
import (
"fmt"
"time"
)
type Event struct {
At time.Time
Message string
Error error
Tags TagList
}
func (e Event) Format(f fmt.State, r rune) {
if !e.At.IsZero() {
fmt.Fprint(f, e.At.Format("2006/01/02 15:04:05 "))
}
fmt.Fprint(f, e.Message)
if e.Error != nil {
fmt.Fprintf(f, ": %v", e.Error)
}
for _, tag := range e.Tags {
fmt.Fprintf(f, "\n\t%v = %v", tag.Key, tag.Value)
}
}

View File

@ -0,0 +1,94 @@
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package export holds the definition of the telemetry Exporter interface,
// along with some simple implementations.
// Larger more complex exporters are in sub packages of their own.
package export
import (
"context"
"os"
"time"
"golang.org/x/tools/internal/telemetry"
)
type Exporter interface {
StartSpan(context.Context, *telemetry.Span)
FinishSpan(context.Context, *telemetry.Span)
// Log is a function that handles logging events.
// Observers may use information in the context to decide what to do with a
// given log event. They should return true if they choose to handle the
Log(context.Context, telemetry.Event)
Metric(context.Context, telemetry.MetricData)
}
var exporter = LogWriter(os.Stderr, true)
func SetExporter(setter func(Exporter) Exporter) {
Do(func() {
exporter = setter(exporter)
})
}
func AddExporters(e ...Exporter) {
Do(func() {
exporter = Multi(append([]Exporter{exporter}, e...)...)
})
}
func StartSpan(ctx context.Context, span *telemetry.Span, at time.Time) {
Do(func() {
span.Start = at
exporter.StartSpan(ctx, span)
})
}
func FinishSpan(ctx context.Context, span *telemetry.Span, at time.Time) {
Do(func() {
span.Finish = at
exporter.FinishSpan(ctx, span)
})
}
func Tag(ctx context.Context, at time.Time, tags telemetry.TagList) {
Do(func() {
// If context has a span we need to add the tags to it
span := telemetry.GetSpan(ctx)
if span == nil {
return
}
if span.Start.IsZero() {
// span still being created, tag it directly
span.Tags = append(span.Tags, tags...)
return
}
// span in progress, add an event to the span
span.Events = append(span.Events, telemetry.Event{
At: at,
Tags: tags,
})
})
}
func Log(ctx context.Context, event telemetry.Event) {
Do(func() {
// If context has a span we need to add the event to it
span := telemetry.GetSpan(ctx)
if span != nil {
span.Events = append(span.Events, event)
}
// and now also hand the event of to the current observer
exporter.Log(ctx, event)
})
}
func Metric(ctx context.Context, data telemetry.MetricData) {
Do(func() {
exporter.Metric(ctx, data)
})
}

View File

@ -0,0 +1,37 @@
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package export
import (
"context"
"fmt"
"io"
"golang.org/x/tools/internal/telemetry"
)
// LogWriter returns an observer that logs events to the supplied writer.
// If onlyErrors is true it does not log any event that did not have an
// associated error.
// It ignores all telemetry other than log events.
func LogWriter(w io.Writer, onlyErrors bool) Exporter {
return &logWriter{writer: w, onlyErrors: onlyErrors}
}
type logWriter struct {
writer io.Writer
onlyErrors bool
}
func (w *logWriter) StartSpan(context.Context, *telemetry.Span) {}
func (w *logWriter) FinishSpan(context.Context, *telemetry.Span) {}
func (w *logWriter) Log(ctx context.Context, event telemetry.Event) {
if event.Error == nil {
// we only log errors by default
return
}
fmt.Fprintf(w.writer, "%v\n", event)
}
func (w *logWriter) Metric(context.Context, telemetry.MetricData) {}

View File

@ -0,0 +1,50 @@
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package export
import (
"context"
"golang.org/x/tools/internal/telemetry"
)
// Multi returns an exporter that invokes all the exporters given to it in order.
func Multi(e ...Exporter) Exporter {
a := make(multi, 0, len(e))
for _, i := range e {
if i == nil {
continue
}
if i, ok := i.(multi); ok {
a = append(a, i...)
continue
}
a = append(a, i)
}
return a
}
type multi []Exporter
func (m multi) StartSpan(ctx context.Context, span *telemetry.Span) {
for _, o := range m {
o.StartSpan(ctx, span)
}
}
func (m multi) FinishSpan(ctx context.Context, span *telemetry.Span) {
for _, o := range m {
o.FinishSpan(ctx, span)
}
}
func (m multi) Log(ctx context.Context, event telemetry.Event) {
for _, o := range m {
o.Log(ctx, event)
}
}
func (m multi) Metric(ctx context.Context, data telemetry.MetricData) {
for _, o := range m {
o.Metric(ctx, data)
}
}

View File

@ -0,0 +1,23 @@
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package export
import (
"context"
"golang.org/x/tools/internal/telemetry"
)
// Null returns an observer that does nothing.
func Null() Exporter {
return null{}
}
type null struct{}
func (null) StartSpan(context.Context, *telemetry.Span) {}
func (null) FinishSpan(context.Context, *telemetry.Span) {}
func (null) Log(context.Context, telemetry.Event) {}
func (null) Metric(context.Context, telemetry.MetricData) {}

View File

@ -9,18 +9,17 @@ package ocagent
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"time"
"golang.org/x/tools/internal/telemetry/log"
"golang.org/x/tools/internal/telemetry/metric"
"golang.org/x/tools/internal/telemetry/ocagent/wire"
"golang.org/x/tools/internal/telemetry"
"golang.org/x/tools/internal/telemetry/export"
"golang.org/x/tools/internal/telemetry/export/ocagent/wire"
"golang.org/x/tools/internal/telemetry/tag"
"golang.org/x/tools/internal/telemetry/trace"
"golang.org/x/tools/internal/telemetry/worker"
)
const DefaultAddress = "http://localhost:55678"
@ -33,9 +32,12 @@ type exporter struct {
metrics []*wire.Metric
}
func Export(service, address string) {
// Connect creates a process specific exporter with the specified
// serviceName and the address of the ocagent to which it will upload
// its telemetry.
func Connect(service, address string) export.Exporter {
if address == "off" {
return
return nil
}
hostname, _ := os.Hostname()
exporter := &exporter{
@ -59,26 +61,25 @@ func Export(service, address string) {
if exporter.address == "" {
exporter.address = DefaultAddress
}
//TODO: add metrics once the ocagent json metric interface works
trace.RegisterObservers(exporter.observeTrace)
go func() {
for _ = range time.Tick(exportRate) {
worker.Do(func() {
export.Do(func() {
exporter.flush()
})
}
}()
return exporter
}
func (e *exporter) observeTrace(span *trace.Span) {
// is this a completed span?
if span.Finish.IsZero() {
return
}
func (e *exporter) StartSpan(ctx context.Context, span *telemetry.Span) {}
func (e *exporter) FinishSpan(ctx context.Context, span *telemetry.Span) {
e.spans = append(e.spans, convertSpan(span))
}
func (e *exporter) observeMetric(data metric.Data) {
func (e *exporter) Log(context.Context, telemetry.Event) {}
func (e *exporter) Metric(ctx context.Context, data telemetry.MetricData) {
e.metrics = append(e.metrics, convertMetric(data))
}
@ -142,10 +143,10 @@ func toTruncatableString(s string) *wire.TruncatableString {
return &wire.TruncatableString{Value: s}
}
func convertSpan(span *trace.Span) *wire.Span {
func convertSpan(span *telemetry.Span) *wire.Span {
result := &wire.Span{
TraceId: span.TraceID[:],
SpanId: span.SpanID[:],
TraceId: span.ID.TraceID[:],
SpanId: span.ID.SpanID[:],
TraceState: nil, //TODO?
ParentSpanId: span.ParentID[:],
Name: toTruncatableString(span.Name),
@ -163,11 +164,11 @@ func convertSpan(span *trace.Span) *wire.Span {
return result
}
func convertMetric(data metric.Data) *wire.Metric {
func convertMetric(data telemetry.MetricData) *wire.Metric {
return nil //TODO:
}
func convertAttributes(tags tag.List) *wire.Attributes {
func convertAttributes(tags telemetry.TagList) *wire.Attributes {
if len(tags) == 0 {
return nil
}
@ -213,7 +214,7 @@ func convertAttribute(v interface{}) wire.Attribute {
}
}
func convertEvents(events []trace.Event) *wire.TimeEvents {
func convertEvents(events []telemetry.Event) *wire.TimeEvents {
//TODO: MessageEvents?
result := make([]wire.TimeEvent, len(events))
for i, event := range events {
@ -222,27 +223,26 @@ func convertEvents(events []trace.Event) *wire.TimeEvents {
return &wire.TimeEvents{TimeEvent: result}
}
func convertEvent(event trace.Event) wire.TimeEvent {
func convertEvent(event telemetry.Event) wire.TimeEvent {
return wire.TimeEvent{
Time: convertTimestamp(event.Time),
Annotation: convertAnnotation(event.Tags),
Time: convertTimestamp(event.At),
Annotation: convertAnnotation(event),
}
}
func convertAnnotation(tags tag.List) *wire.Annotation {
if len(tags) == 0 {
func convertAnnotation(event telemetry.Event) *wire.Annotation {
description := event.Message
if description == "" && event.Error != nil {
description = event.Error.Error()
event.Error = nil
}
tags := event.Tags
if event.Error != nil {
tags = append(tags, tag.Of("Error", event.Error))
}
if description == "" && len(tags) == 0 {
return nil
}
entry := log.ToEntry(nil, time.Time{}, tags)
description := entry.Message
if description == "" && entry.Error != nil {
description = entry.Error.Error()
entry.Error = nil
}
tags = entry.Tags
if entry.Error != nil {
tags = append(tags, tag.Of("Error", entry.Error))
}
return &wire.Annotation{
Description: toTruncatableString(description),
Attributes: convertAttributes(tags),

View File

@ -10,27 +10,28 @@ import (
"reflect"
"testing"
"golang.org/x/tools/internal/telemetry/log"
"golang.org/x/tools/internal/telemetry/ocagent/wire"
"golang.org/x/tools/internal/telemetry"
"golang.org/x/tools/internal/telemetry/export/ocagent/wire"
"golang.org/x/tools/internal/telemetry/tag"
)
func TestConvert_annotation(t *testing.T) {
tests := []struct {
name string
tagList tag.List
want *wire.Annotation
name string
event telemetry.Event
want *wire.Annotation
}{
{
name: "no tags",
tagList: nil,
want: nil,
name: "no tags",
want: nil,
},
{
name: "description no error",
tagList: tag.List{
tag.Of(log.MessageTag, "cache miss"),
tag.Of("db", "godb"),
event: telemetry.Event{
Message: "cache miss",
Tags: telemetry.TagList{
tag.Of("db", "godb"),
},
},
want: &wire.Annotation{
Description: &wire.TruncatableString{Value: "cache miss"},
@ -44,10 +45,12 @@ func TestConvert_annotation(t *testing.T) {
{
name: "description and error",
tagList: tag.List{
tag.Of(log.MessageTag, "cache miss"),
tag.Of("db", "godb"),
tag.Of(log.ErrorTag, errors.New("no network connectivity")),
event: telemetry.Event{
Message: "cache miss",
Error: errors.New("no network connectivity"),
Tags: telemetry.TagList{
tag.Of("db", "godb"),
},
},
want: &wire.Annotation{
Description: &wire.TruncatableString{Value: "cache miss"},
@ -61,9 +64,11 @@ func TestConvert_annotation(t *testing.T) {
},
{
name: "no description, but error",
tagList: tag.List{
tag.Of("db", "godb"),
tag.Of(log.ErrorTag, errors.New("no network connectivity")),
event: telemetry.Event{
Error: errors.New("no network connectivity"),
Tags: telemetry.TagList{
tag.Of("db", "godb"),
},
},
want: &wire.Annotation{
Description: &wire.TruncatableString{Value: "no network connectivity"},
@ -76,28 +81,30 @@ func TestConvert_annotation(t *testing.T) {
},
{
name: "enumerate all attribute types",
tagList: tag.List{
tag.Of(log.MessageTag, "cache miss"),
tag.Of("db", "godb"),
event: telemetry.Event{
Message: "cache miss",
Tags: telemetry.TagList{
tag.Of("db", "godb"),
tag.Of("age", 0.456), // Constant converted into "float64"
tag.Of("ttl", float32(5000)),
tag.Of("expiry_ms", float64(1e3)),
tag.Of("age", 0.456), // Constant converted into "float64"
tag.Of("ttl", float32(5000)),
tag.Of("expiry_ms", float64(1e3)),
tag.Of("retry", false),
tag.Of("stale", true),
tag.Of("retry", false),
tag.Of("stale", true),
tag.Of("max", 0x7fff), // Constant converted into "int"
tag.Of("opcode", int8(0x7e)),
tag.Of("base", int16(1<<9)),
tag.Of("checksum", int32(0x11f7e294)),
tag.Of("mode", int64(0644)),
tag.Of("max", 0x7fff), // Constant converted into "int"
tag.Of("opcode", int8(0x7e)),
tag.Of("base", int16(1<<9)),
tag.Of("checksum", int32(0x11f7e294)),
tag.Of("mode", int64(0644)),
tag.Of("min", uint(1)),
tag.Of("mix", uint8(44)),
tag.Of("port", uint16(55678)),
tag.Of("min_hops", uint32(1<<9)),
tag.Of("max_hops", uint64(0xffffff)),
tag.Of("min", uint(1)),
tag.Of("mix", uint8(44)),
tag.Of("port", uint16(55678)),
tag.Of("min_hops", uint32(1<<9)),
tag.Of("max_hops", uint64(0xffffff)),
},
},
want: &wire.Annotation{
Description: &wire.TruncatableString{Value: "cache miss"},
@ -131,7 +138,7 @@ func TestConvert_annotation(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := convertAnnotation(tt.tagList)
got := convertAnnotation(tt.event)
if !reflect.DeepEqual(got, tt.want) {
t.Fatalf("Got:\n%s\nWant:\n%s", marshaled(got), marshaled(tt.want))
}

View File

@ -0,0 +1,123 @@
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package prometheus
import (
"bytes"
"context"
"fmt"
"net/http"
"sort"
"golang.org/x/tools/internal/telemetry"
"golang.org/x/tools/internal/telemetry/export"
"golang.org/x/tools/internal/telemetry/metric"
)
func New() *Exporter {
return &Exporter{}
}
type Exporter struct {
metrics []telemetry.MetricData
}
func (e *Exporter) StartSpan(ctx context.Context, span *telemetry.Span) {}
func (e *Exporter) FinishSpan(ctx context.Context, span *telemetry.Span) {}
func (e *Exporter) Log(ctx context.Context, event telemetry.Event) {}
func (e *Exporter) Metric(ctx context.Context, data telemetry.MetricData) {
name := data.Handle()
// We keep the metrics in name sorted order so the page is stable and easy
// to read. We do this with an insertion sort rather than sorting the list
// each time
index := sort.Search(len(e.metrics), func(i int) bool {
return e.metrics[i].Handle() >= name
})
if index >= len(e.metrics) || e.metrics[index].Handle() != name {
// we have a new metric, so we need to make a space for it
old := e.metrics
e.metrics = make([]telemetry.MetricData, len(old)+1)
copy(e.metrics, old[:index])
copy(e.metrics[index+1:], old[index:])
}
e.metrics[index] = data
}
func (e *Exporter) header(w http.ResponseWriter, name, description string, isGauge, isHistogram bool) {
kind := "counter"
if isGauge {
kind = "gauge"
}
if isHistogram {
kind = "histogram"
}
fmt.Fprintf(w, "# HELP %s %s\n", name, description)
fmt.Fprintf(w, "# TYPE %s %s\n", name, kind)
}
func (e *Exporter) row(w http.ResponseWriter, name string, group telemetry.TagList, extra string, value interface{}) {
fmt.Fprint(w, name)
buf := &bytes.Buffer{}
fmt.Fprint(buf, group)
if extra != "" {
if buf.Len() > 0 {
fmt.Fprint(buf, ",")
}
fmt.Fprint(buf, extra)
}
if buf.Len() > 0 {
fmt.Fprint(w, "{")
buf.WriteTo(w)
fmt.Fprint(w, "}")
}
fmt.Fprintf(w, " %v\n", value)
}
func (e *Exporter) Serve(w http.ResponseWriter, r *http.Request) {
done := make(chan struct{})
export.Do(func() {
defer close(done)
for _, data := range e.metrics {
switch data := data.(type) {
case *metric.Int64Data:
e.header(w, data.Info.Name, data.Info.Description, data.IsGauge, false)
for i, group := range data.Groups() {
e.row(w, data.Info.Name, group, "", data.Rows[i])
}
case *metric.Float64Data:
e.header(w, data.Info.Name, data.Info.Description, data.IsGauge, false)
for i, group := range data.Groups() {
e.row(w, data.Info.Name, group, "", data.Rows[i])
}
case *metric.HistogramInt64Data:
e.header(w, data.Info.Name, data.Info.Description, false, true)
for i, group := range data.Groups() {
row := data.Rows[i]
for j, b := range data.Info.Buckets {
e.row(w, data.Info.Name+"_bucket", group, fmt.Sprintf(`le="%v"`, b), row.Values[j])
}
e.row(w, data.Info.Name+"_bucket", group, `le="+Inf"`, row.Count)
e.row(w, data.Info.Name+"_count", group, "", row.Count)
e.row(w, data.Info.Name+"_sum", group, "", row.Sum)
}
case *metric.HistogramFloat64Data:
e.header(w, data.Info.Name, data.Info.Description, false, true)
for i, group := range data.Groups() {
row := data.Rows[i]
for j, b := range data.Info.Buckets {
e.row(w, data.Info.Name+"_bucket", group, fmt.Sprintf(`le="%v"`, b), row.Values[j])
}
e.row(w, data.Info.Name+"_bucket", group, `le="+Inf"`, row.Count)
e.row(w, data.Info.Name+"_count", group, "", row.Count)
e.row(w, data.Info.Name+"_sum", group, "", row.Sum)
}
}
}
})
<-done
}

View File

@ -2,9 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package worker provides a very simple mechanism to allow telemetry packages
// to work cooperatively and efficiently.
package worker
package export
import (
"fmt"
@ -12,6 +10,8 @@ import (
)
var (
// TODO: Think about whether this is the right concurrency model, and what
// TODO: the queue length should be
workQueue = make(chan func(), 1000)
)

View File

@ -2,8 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package tag adds support for telemetry tracins.
package trace
package telemetry
import (
crand "crypto/rand"
@ -49,7 +48,7 @@ func initGenerator() {
spanIDInc |= 1
}
func newTraceID() TraceID {
func NewTraceID() TraceID {
generationMu.Lock()
defer generationMu.Unlock()
if traceIDRand == nil {
@ -61,7 +60,7 @@ func newTraceID() TraceID {
return tid
}
func newSpanID() SpanID {
func NewSpanID() SpanID {
var id uint64
for id == 0 {
id = atomic.AddUint64(&nextSpanID, spanIDInc)

View File

@ -1,51 +0,0 @@
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package log
import (
"context"
"fmt"
"time"
"golang.org/x/tools/internal/telemetry/tag"
)
type Entry struct {
At time.Time
Message string
Error error
Tags tag.List
}
func ToEntry(ctx context.Context, at time.Time, tags tag.List) Entry {
//TODO: filter more efficiently for the common case of stripping prefixes only
entry := Entry{
At: at,
}
for _, t := range tags {
switch t.Key {
case MessageTag:
entry.Message = t.Value.(string)
case ErrorTag:
entry.Error = t.Value.(error)
default:
entry.Tags = append(entry.Tags, t)
}
}
return entry
}
func (e Entry) Format(f fmt.State, r rune) {
if !e.At.IsZero() {
fmt.Fprint(f, e.At.Format("2006/01/02 15:04:05 "))
}
fmt.Fprint(f, e.Message)
if e.Error != nil {
fmt.Fprintf(f, ": %v", e.Error)
}
for _, tag := range e.Tags {
fmt.Fprintf(f, "\n\t%v = %v", tag.Key, tag.Value)
}
}

View File

@ -8,40 +8,45 @@ package log
import (
"context"
"fmt"
"os"
"time"
"golang.org/x/tools/internal/telemetry"
"golang.org/x/tools/internal/telemetry/export"
"golang.org/x/tools/internal/telemetry/tag"
"golang.org/x/tools/internal/telemetry/worker"
)
const (
// The well known tag keys for the logging system.
MessageTag = tag.Key("message")
ErrorTag = tag.Key("error")
)
// Logger is a function that handles logging messages.
// Loggers are registered at start up, and may use information in the context
// to decide what to do with a given log message.
type Logger func(ctx context.Context, at time.Time, tags tag.List) bool
type Event telemetry.Event
// With sends a tag list to the installed loggers.
func With(ctx context.Context, tags ...tag.Tag) {
at := time.Now()
worker.Do(func() {
deliver(ctx, at, tags)
func With(ctx context.Context, tags ...telemetry.Tag) {
export.Log(ctx, telemetry.Event{
At: time.Now(),
Tags: tags,
})
}
// Print takes a message and a tag list and combines them into a single tag
// list before delivering them to the loggers.
func Print(ctx context.Context, message string, tags ...tag.Tagger) {
at := time.Now()
worker.Do(func() {
tags := append(tag.Tags(ctx, tags...), MessageTag.Of(message))
deliver(ctx, at, tags)
export.Log(ctx, telemetry.Event{
At: time.Now(),
Message: message,
Tags: tag.Tags(ctx, tags...),
})
}
// Print takes a message and a tag list and combines them into a single tag
// list before delivering them to the loggers.
func Error(ctx context.Context, message string, err error, tags ...tag.Tagger) {
if err == nil {
err = errorString(message)
message = ""
}
export.Log(ctx, telemetry.Event{
At: time.Now(),
Message: message,
Error: err,
Tags: tag.Tags(ctx, tags...),
})
}
@ -49,50 +54,3 @@ type errorString string
// Error allows errorString to conform to the error interface.
func (err errorString) Error() string { return string(err) }
// Print takes a message and a tag list and combines them into a single tag
// list before delivering them to the loggers.
func Error(ctx context.Context, message string, err error, tags ...tag.Tagger) {
at := time.Now()
worker.Do(func() {
if err == nil {
err = errorString(message)
message = ""
}
tags := append(tag.Tags(ctx, tags...), MessageTag.Of(message), ErrorTag.Of(err))
deliver(ctx, at, tags)
})
}
func deliver(ctx context.Context, at time.Time, tags tag.List) {
delivered := false
for _, logger := range loggers {
if logger(ctx, at, tags) {
delivered = true
}
}
if !delivered {
// no logger processed the message, so we log to stderr just in case
Stderr(ctx, at, tags)
}
}
var loggers = []Logger{}
func AddLogger(logger Logger) {
worker.Do(func() {
loggers = append(loggers, logger)
})
}
// Stderr is a logger that logs to stderr in the standard format.
func Stderr(ctx context.Context, at time.Time, tags tag.List) bool {
fmt.Fprintf(os.Stderr, "%v\n", ToEntry(ctx, at, tags))
return true
}
// NullLogger is a logger that throws away log messages and reports
// success so that the fallback stderr logging does not happen.
var NullLogger = func(context.Context, time.Time, tag.List) bool {
return true
}

View File

@ -0,0 +1,17 @@
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package telemetry
// Data represents a single point in the time series of a metric.
// This provides the common interface to all metrics no matter their data
// format.
// To get the actual values for the metric you must type assert to a concrete
// metric type.
type MetricData interface {
// Handle returns the metric handle this data is for.
Handle() string
// Groups reports the rows that currently exist for this metric.
Groups() []TagList
}

View File

@ -9,30 +9,12 @@ import (
"context"
"sort"
"golang.org/x/tools/internal/telemetry"
"golang.org/x/tools/internal/telemetry/export"
"golang.org/x/tools/internal/telemetry/stats"
"golang.org/x/tools/internal/telemetry/tag"
"golang.org/x/tools/internal/telemetry/worker"
)
// Handle uniquely identifies a constructed metric.
// It can be used to detect which observed data objects belong
// to that metric.
type Handle struct {
name string
}
// Data represents a single point in the time series of a metric.
// This provides the common interface to all metrics no matter their data
// format.
// To get the actual values for the metric you must type assert to a concrete
// metric type.
type Data interface {
// Handle returns the metric handle this data is for.
Handle() Handle
// Groups reports the rows that currently exist for this metric.
Groups() []tag.List
}
// Scalar represents the construction information for a scalar metric.
type Scalar struct {
// Name is the unique name of this metric.
@ -67,83 +49,77 @@ type HistogramFloat64 struct {
Buckets []float64
}
// Observer is the type for functions that want to observe metric values
// as they arrive.
// Each data point delivered to an observer is immutable and can be stored if
// needed.
type Observer func(Data)
// CountInt64 creates a new metric based on the Scalar information that counts
// the number of times the supplied int64 measure is set.
// Metrics of this type will use Int64Data.
func (info Scalar) CountInt64(measure *stats.Int64Measure) Handle {
func (info Scalar) CountInt64(measure *stats.Int64Measure) string {
data := &Int64Data{Info: &info}
measure.Subscribe(data.countInt64)
return Handle{info.Name}
return info.Name
}
// SumInt64 creates a new metric based on the Scalar information that sums all
// the values recorded on the int64 measure.
// Metrics of this type will use Int64Data.
func (info Scalar) SumInt64(measure *stats.Int64Measure) Handle {
func (info Scalar) SumInt64(measure *stats.Int64Measure) string {
data := &Int64Data{Info: &info}
measure.Subscribe(data.sum)
_ = data
return Handle{info.Name}
return info.Name
}
// LatestInt64 creates a new metric based on the Scalar information that tracks
// the most recent value recorded on the int64 measure.
// Metrics of this type will use Int64Data.
func (info Scalar) LatestInt64(measure *stats.Int64Measure) Handle {
func (info Scalar) LatestInt64(measure *stats.Int64Measure) string {
data := &Int64Data{Info: &info, IsGauge: true}
measure.Subscribe(data.latest)
return Handle{info.Name}
return info.Name
}
// CountFloat64 creates a new metric based on the Scalar information that counts
// the number of times the supplied float64 measure is set.
// Metrics of this type will use Int64Data.
func (info Scalar) CountFloat64(measure *stats.Float64Measure) Handle {
func (info Scalar) CountFloat64(measure *stats.Float64Measure) string {
data := &Int64Data{Info: &info}
measure.Subscribe(data.countFloat64)
return Handle{info.Name}
return info.Name
}
// SumFloat64 creates a new metric based on the Scalar information that sums all
// the values recorded on the float64 measure.
// Metrics of this type will use Float64Data.
func (info Scalar) SumFloat64(measure *stats.Float64Measure) Handle {
func (info Scalar) SumFloat64(measure *stats.Float64Measure) string {
data := &Float64Data{Info: &info}
measure.Subscribe(data.sum)
return Handle{info.Name}
return info.Name
}
// LatestFloat64 creates a new metric based on the Scalar information that tracks
// the most recent value recorded on the float64 measure.
// Metrics of this type will use Float64Data.
func (info Scalar) LatestFloat64(measure *stats.Float64Measure) Handle {
func (info Scalar) LatestFloat64(measure *stats.Float64Measure) string {
data := &Float64Data{Info: &info, IsGauge: true}
measure.Subscribe(data.latest)
return Handle{info.Name}
return info.Name
}
// Record creates a new metric based on the HistogramInt64 information that
// tracks the bucketized counts of values recorded on the int64 measure.
// Metrics of this type will use HistogramInt64Data.
func (info HistogramInt64) Record(measure *stats.Int64Measure) Handle {
func (info HistogramInt64) Record(measure *stats.Int64Measure) string {
data := &HistogramInt64Data{Info: &info}
measure.Subscribe(data.record)
return Handle{info.Name}
return info.Name
}
// Record creates a new metric based on the HistogramFloat64 information that
// tracks the bucketized counts of values recorded on the float64 measure.
// Metrics of this type will use HistogramFloat64Data.
func (info HistogramFloat64) Record(measure *stats.Float64Measure) Handle {
func (info HistogramFloat64) Record(measure *stats.Float64Measure) string {
data := &HistogramFloat64Data{Info: &info}
measure.Subscribe(data.record)
return Handle{info.Name}
return info.Name
}
// Int64Data is a concrete implementation of Data for int64 scalar metrics.
@ -155,7 +131,7 @@ type Int64Data struct {
// Rows holds the per group values for the metric.
Rows []int64
groups []tag.List
groups []telemetry.TagList
}
// Float64Data is a concrete implementation of Data for float64 scalar metrics.
@ -167,7 +143,7 @@ type Float64Data struct {
// Rows holds the per group values for the metric.
Rows []float64
groups []tag.List
groups []telemetry.TagList
}
// HistogramInt64Data is a concrete implementation of Data for int64 histogram metrics.
@ -177,7 +153,7 @@ type HistogramInt64Data struct {
// Rows holds the per group values for the metric.
Rows []*HistogramInt64Row
groups []tag.List
groups []telemetry.TagList
}
// HistogramInt64Row holds the values for a single row of a HistogramInt64Data.
@ -201,7 +177,7 @@ type HistogramFloat64Data struct {
// Rows holds the per group values for the metric.
Rows []*HistogramFloat64Row
groups []tag.List
groups []telemetry.TagList
}
// HistogramFloat64Row holds the values for a single row of a HistogramFloat64Data.
@ -218,27 +194,7 @@ type HistogramFloat64Row struct {
Max float64
}
// Name returns the name of the metric this is a handle for.
func (h Handle) Name() string { return h.name }
var observers []Observer
// RegisterObservers adds a new metric observer to the system.
// There is no way to unregister an observer.
func RegisterObservers(e ...Observer) {
worker.Do(func() {
observers = append(e, observers...)
})
}
// export must only be called from inside a worker
func export(m Data) {
for _, e := range observers {
e(m)
}
}
func getGroup(ctx context.Context, g *[]tag.List, keys []interface{}) (int, bool) {
func getGroup(ctx context.Context, g *[]telemetry.TagList, keys []interface{}) (int, bool) {
group := tag.Get(ctx, keys...)
old := *g
index := sort.Search(len(old), func(i int) bool {
@ -248,18 +204,18 @@ func getGroup(ctx context.Context, g *[]tag.List, keys []interface{}) (int, bool
// not a new group
return index, false
}
*g = make([]tag.List, len(old)+1)
*g = make([]telemetry.TagList, len(old)+1)
copy(*g, old[:index])
copy((*g)[index+1:], old[index:])
(*g)[index] = group
return index, true
}
func (data *Int64Data) Handle() Handle { return Handle{data.Info.Name} }
func (data *Int64Data) Groups() []tag.List { return data.groups }
func (data *Int64Data) Handle() string { return data.Info.Name }
func (data *Int64Data) Groups() []telemetry.TagList { return data.groups }
func (data *Int64Data) modify(ctx context.Context, f func(v int64) int64) {
worker.Do(func() {
export.Do(func() {
index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
old := data.Rows
if insert {
@ -272,7 +228,7 @@ func (data *Int64Data) modify(ctx context.Context, f func(v int64) int64) {
}
data.Rows[index] = f(data.Rows[index])
frozen := *data
export(&frozen)
export.Metric(ctx, &frozen)
})
}
@ -292,11 +248,11 @@ func (data *Int64Data) latest(ctx context.Context, measure *stats.Int64Measure,
data.modify(ctx, func(v int64) int64 { return value })
}
func (data *Float64Data) Handle() Handle { return Handle{data.Info.Name} }
func (data *Float64Data) Groups() []tag.List { return data.groups }
func (data *Float64Data) Handle() string { return data.Info.Name }
func (data *Float64Data) Groups() []telemetry.TagList { return data.groups }
func (data *Float64Data) modify(ctx context.Context, f func(v float64) float64) {
worker.Do(func() {
export.Do(func() {
index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
old := data.Rows
if insert {
@ -309,7 +265,7 @@ func (data *Float64Data) modify(ctx context.Context, f func(v float64) float64)
}
data.Rows[index] = f(data.Rows[index])
frozen := *data
export(&frozen)
export.Metric(ctx, &frozen)
})
}
@ -321,11 +277,11 @@ func (data *Float64Data) latest(ctx context.Context, measure *stats.Float64Measu
data.modify(ctx, func(v float64) float64 { return value })
}
func (data *HistogramInt64Data) Handle() Handle { return Handle{data.Info.Name} }
func (data *HistogramInt64Data) Groups() []tag.List { return data.groups }
func (data *HistogramInt64Data) Handle() string { return data.Info.Name }
func (data *HistogramInt64Data) Groups() []telemetry.TagList { return data.groups }
func (data *HistogramInt64Data) modify(ctx context.Context, f func(v *HistogramInt64Row)) {
worker.Do(func() {
export.Do(func() {
index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
old := data.Rows
var v HistogramInt64Row
@ -344,7 +300,7 @@ func (data *HistogramInt64Data) modify(ctx context.Context, f func(v *HistogramI
f(&v)
data.Rows[index] = &v
frozen := *data
export(&frozen)
export.Metric(ctx, &frozen)
})
}
@ -366,11 +322,11 @@ func (data *HistogramInt64Data) record(ctx context.Context, measure *stats.Int64
})
}
func (data *HistogramFloat64Data) Handle() Handle { return Handle{data.Info.Name} }
func (data *HistogramFloat64Data) Groups() []tag.List { return data.groups }
func (data *HistogramFloat64Data) Handle() string { return data.Info.Name }
func (data *HistogramFloat64Data) Groups() []telemetry.TagList { return data.groups }
func (data *HistogramFloat64Data) modify(ctx context.Context, f func(v *HistogramFloat64Row)) {
worker.Do(func() {
export.Do(func() {
index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
old := data.Rows
var v HistogramFloat64Row
@ -389,7 +345,7 @@ func (data *HistogramFloat64Data) modify(ctx context.Context, f func(v *Histogra
f(&v)
data.Rows[index] = &v
frozen := *data
export(&frozen)
export.Metric(ctx, &frozen)
})
}

71
internal/telemetry/tag.go Normal file
View File

@ -0,0 +1,71 @@
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package telemetry
import (
"context"
"fmt"
)
// Tag holds a key and value pair.
// It is normally used when passing around lists of tags.
type Tag struct {
Key interface{}
Value interface{}
}
// TagList is a way of passing around a collection of key value pairs.
// It is an alternative to the less efficient and unordered method of using
// maps.
type TagList []Tag
// Format is used for debug printing of tags.
func (t Tag) Format(f fmt.State, r rune) {
fmt.Fprintf(f, `%v="%v"`, t.Key, t.Value)
}
// Get returns the tag unmodified.
// It makes Key conform to the Tagger interface.
func (t Tag) Tag(ctx context.Context) Tag {
return t
}
// Get will get a single key's value from the list.
func (l TagList) Get(k interface{}) interface{} {
for _, t := range l {
if t.Key == k {
return t.Value
}
}
return nil
}
// Format pretty prints a list.
// It is intended only for debugging.
func (l TagList) Format(f fmt.State, r rune) {
printed := false
for _, t := range l {
if t.Value == nil {
continue
}
if printed {
fmt.Fprint(f, ",")
}
fmt.Fprint(f, t)
printed = true
}
}
// Equal returns true if two lists are identical.
func (l TagList) Equal(other TagList) bool {
//TODO: make this more efficient
return fmt.Sprint(l) == fmt.Sprint(other)
}
// Less is intended only for using tag lists as a sorting key.
func (l TagList) Less(other TagList) bool {
//TODO: make this more efficient
return fmt.Sprint(l) < fmt.Sprint(other)
}

View File

@ -7,6 +7,8 @@ package tag
import (
"context"
"golang.org/x/tools/internal/telemetry"
)
// Key represents the key for a context tag.
@ -19,25 +21,25 @@ type Key string
// Of returns a Tag for a key and value.
// This is a trivial helper that makes common logging easier to read.
func Of(key interface{}, value interface{}) Tag {
return Tag{Key: key, Value: value}
func Of(key interface{}, value interface{}) telemetry.Tag {
return telemetry.Tag{Key: key, Value: value}
}
// Of creates a new Tag with this key and the supplied value.
// You can use this when building a tag list.
func (k Key) Of(v interface{}) Tag {
return Tag{Key: k, Value: v}
func (k Key) Of(v interface{}) telemetry.Tag {
return telemetry.Tag{Key: k, Value: v}
}
// Tag can be used to get a tag for the key from a context.
// It makes Key conform to the Tagger interface.
func (k Key) Tag(ctx context.Context) Tag {
return Tag{Key: k, Value: ctx.Value(k)}
func (k Key) Tag(ctx context.Context) telemetry.Tag {
return telemetry.Tag{Key: k, Value: ctx.Value(k)}
}
// With applies sets this key to the supplied value on the context and
// returns the new context generated.
// It uses the With package level function so that observers are also notified.
func (k Key) With(ctx context.Context, v interface{}) context.Context {
return With(ctx, Tag{Key: k, Value: v})
return With(ctx, telemetry.Tag{Key: k, Value: v})
}

View File

@ -9,134 +9,51 @@ package tag
import (
"context"
"fmt"
"time"
"golang.org/x/tools/internal/telemetry/worker"
"golang.org/x/tools/internal/telemetry"
"golang.org/x/tools/internal/telemetry/export"
)
//TODO: Do we need to do something more efficient than just store tags
//TODO: directly on the context?
// Tag holds a key and value pair.
// It is normally used when passing around lists of tags.
type Tag struct {
Key interface{}
Value interface{}
}
// Tagger is the interface to somthing that returns a Tag given a context.
// Both Tag itself and Key support this interface, allowing methods that can
// take either (and other implementations as well)
type Tagger interface {
// Tag returns a Tag potentially using information from the Context.
Tag(context.Context) Tag
Tag(context.Context) telemetry.Tag
}
// List is a way of passing around a collection of key value pairs.
// It is an alternative to the less efficient and unordered method of using
// maps.
type List []Tag
// Observer is the type for a function that wants to be notified when new tags
// are set on a context.
// If you use context.WithValue (or equivalent) it will bypass the observers,
// you must use the setters in this package for tags that should be observed.
// Register new observers with the Observe function.
type Observer func(ctx context.Context, at time.Time, tags List)
// With is roughly equivalent to context.WithValue except that it also notifies
// registered observers.
// Unlike WithValue, it takes a list of tags so that you can set many values
// at once if needed. Each call to With results in one invocation of each
// observer.
func With(ctx context.Context, tags ...Tag) context.Context {
func With(ctx context.Context, tags ...telemetry.Tag) context.Context {
at := time.Now()
for _, t := range tags {
ctx = context.WithValue(ctx, t.Key, t.Value)
}
worker.Do(func() {
for i := len(observers) - 1; i >= 0; i-- {
observers[i](ctx, at, tags)
}
})
export.Tag(ctx, at, tags)
return ctx
}
// Get collects a set of values from the context and returns them as a tag list.
func Get(ctx context.Context, keys ...interface{}) List {
tags := make(List, len(keys))
func Get(ctx context.Context, keys ...interface{}) telemetry.TagList {
tags := make(telemetry.TagList, len(keys))
for i, key := range keys {
tags[i] = Tag{Key: key, Value: ctx.Value(key)}
tags[i] = telemetry.Tag{Key: key, Value: ctx.Value(key)}
}
return tags
}
// Tags collects a list of tags for the taggers from the context.
func Tags(ctx context.Context, taggers ...Tagger) List {
tags := make(List, len(taggers))
func Tags(ctx context.Context, taggers ...Tagger) telemetry.TagList {
tags := make(telemetry.TagList, len(taggers))
for i, t := range taggers {
tags[i] = t.Tag(ctx)
}
return tags
}
var observers = []Observer{}
// Observe adds a new tag observer to the registered set.
// There is no way to ever unregister a observer.
// Observers are free to use context information to control their behavior.
func Observe(observer Observer) {
worker.Do(func() {
observers = append(observers, observer)
})
}
// Format is used for debug printing of tags.
func (t Tag) Format(f fmt.State, r rune) {
fmt.Fprintf(f, `%v="%v"`, t.Key, t.Value)
}
// Get returns the tag unmodified.
// It makes Key conform to the Tagger interface.
func (t Tag) Tag(ctx context.Context) Tag {
return t
}
// Get will get a single key's value from the list.
func (l List) Get(k interface{}) interface{} {
for _, t := range l {
if t.Key == k {
return t.Value
}
}
return nil
}
// Format pretty prints a list.
// It is intended only for debugging.
func (l List) Format(f fmt.State, r rune) {
printed := false
for _, t := range l {
if t.Value == nil {
continue
}
if printed {
fmt.Fprint(f, ",")
}
fmt.Fprint(f, t)
printed = true
}
}
// Equal returns true if two lists are identical.
func (l List) Equal(other List) bool {
//TODO: make this more efficient
return fmt.Sprint(l) == fmt.Sprint(other)
}
// Less is intended only for using tag lists as a sorting key.
func (l List) Less(other List) bool {
//TODO: make this more efficient
return fmt.Sprint(l) < fmt.Sprint(other)
}

View File

@ -0,0 +1,37 @@
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package telemetry
import (
"fmt"
"time"
)
type SpanContext struct {
TraceID TraceID
SpanID SpanID
}
type Span struct {
Name string
ID SpanContext
ParentID SpanID
Start time.Time
Finish time.Time
Tags TagList
Events []Event
}
func (s *SpanContext) Format(f fmt.State, r rune) {
fmt.Fprintf(f, "%v:%v", s.TraceID, s.SpanID)
}
func (s *Span) Format(f fmt.State, r rune) {
fmt.Fprintf(f, "%v %v", s.Name, s.ID)
if s.ParentID.IsValid() {
fmt.Fprintf(f, "[%v]", s.ParentID)
}
fmt.Fprintf(f, " %v->%v", s.Start, s.Finish)
}

View File

@ -7,134 +7,33 @@ package trace
import (
"context"
"fmt"
"time"
"golang.org/x/tools/internal/telemetry/log"
"golang.org/x/tools/internal/telemetry"
"golang.org/x/tools/internal/telemetry/export"
"golang.org/x/tools/internal/telemetry/tag"
"golang.org/x/tools/internal/telemetry/worker"
)
type Span struct {
Name string
TraceID TraceID
SpanID SpanID
ParentID SpanID
Start time.Time
Finish time.Time
Tags tag.List
Events []Event
ready bool
}
type Event struct {
Time time.Time
Tags tag.List
}
type Observer func(*Span)
func RegisterObservers(o ...Observer) {
worker.Do(func() {
if !registered {
registered = true
tag.Observe(tagObserver)
log.AddLogger(logger)
}
observers = append(observers, o...)
})
}
func StartSpan(ctx context.Context, name string, tags ...tag.Tag) (context.Context, func()) {
span := &Span{
Name: name,
Start: time.Now(),
}
if parent := fromContext(ctx); parent != nil {
span.TraceID = parent.TraceID
span.ParentID = parent.SpanID
func StartSpan(ctx context.Context, name string, tags ...telemetry.Tag) (context.Context, func()) {
start := time.Now()
span := &telemetry.Span{Name: name}
if parent := telemetry.GetSpan(ctx); parent != nil {
span.ID.TraceID = parent.ID.TraceID
span.ParentID = parent.ID.SpanID
} else {
span.TraceID = newTraceID()
span.ID.TraceID = telemetry.NewTraceID()
}
span.SpanID = newSpanID()
ctx = context.WithValue(ctx, contextKey, span)
span.ID.SpanID = telemetry.NewSpanID()
ctx = telemetry.WithSpan(ctx, span)
if len(tags) > 0 {
ctx = tag.With(ctx, tags...)
}
worker.Do(func() {
span.ready = true
for _, o := range observers {
o(span)
}
})
return ctx, span.close
}
func (s *Span) close() {
now := time.Now()
worker.Do(func() {
s.Finish = now
for _, o := range observers {
o(s)
}
})
}
func (s *Span) Format(f fmt.State, r rune) {
fmt.Fprintf(f, "%v %v:%v", s.Name, s.TraceID, s.SpanID)
if s.ParentID.IsValid() {
fmt.Fprintf(f, "[%v]", s.ParentID)
}
fmt.Fprintf(f, " %v->%v", s.Start, s.Finish)
}
type contextKeyType int
var contextKey contextKeyType
func fromContext(ctx context.Context) *Span {
v := ctx.Value(contextKey)
if v == nil {
return nil
}
return v.(*Span)
}
var (
observers []Observer
registered bool
)
func tagObserver(ctx context.Context, at time.Time, tags tag.List) {
span := fromContext(ctx)
if span == nil {
return
}
if !span.ready {
span.Tags = append(span.Tags, tags...)
return
}
span.Events = append(span.Events, Event{
Time: at,
Tags: tags,
})
}
func logger(ctx context.Context, at time.Time, tags tag.List) bool {
span := fromContext(ctx)
if span == nil {
return false
}
span.Events = append(span.Events, Event{
Time: at,
Tags: tags,
})
return false
export.StartSpan(ctx, span, start)
return ctx, func() { export.FinishSpan(ctx, span, time.Now()) }
}
// Detach returns a context without an associated span.
// This allows the creation of spans that are not children of the current span.
func Detach(ctx context.Context) context.Context {
return context.WithValue(ctx, contextKey, nil)
return telemetry.WithSpan(ctx, nil)
}