1
0
mirror of https://github.com/golang/go synced 2024-11-05 14:46:11 -07:00

internal/telemetry/export/ocagent: attach timestamps to metrics

This change attaches start timestamps to timeseries and end
timestamps to the points in each timeseries. Int64Data,
Float64Data, HistogramInt64Data, and HistogramFloat64Data
have also had an EndTime field added to keep track of the last
time the metric was updated.

What works:
* Start and end timestamps will now be attached to timeseries.

What does not work yet:
* MetricDescriptors will not have a unit attached.
* No labels will be attached to timeseries.
* Distributions will not have SumOfSquaredDeviation attached.

Updates golang/go#33819

Change-Id: I692e1676bb1e31de26c1f799b96428fc9a55d6c7
Reviewed-on: https://go-review.googlesource.com/c/tools/+/203060
Run-TryBot: Emmanuel Odeke <emm.odeke@gmail.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Emmanuel Odeke <emm.odeke@gmail.com>
This commit is contained in:
Nathan Dias 2019-10-23 19:34:23 -05:00 committed by Emmanuel Odeke
parent 7b6f5d95f3
commit be03d4f470
4 changed files with 102 additions and 39 deletions

View File

@ -6,6 +6,7 @@ package ocagent
import (
"fmt"
"time"
"golang.org/x/tools/internal/telemetry"
"golang.org/x/tools/internal/telemetry/export/ocagent/wire"
@ -95,17 +96,18 @@ func dataToMetricDescriptorType(data telemetry.MetricData) wire.MetricDescriptor
// dataToTimeseries returns a slice of *wire.TimeSeries based on the
// points in data.
func dataToTimeseries(data telemetry.MetricData) []*wire.TimeSeries {
func dataToTimeseries(data telemetry.MetricData, start time.Time) []*wire.TimeSeries {
if data == nil {
return nil
}
numRows := numRows(data)
startTimestamp := convertTimestamp(start)
timeseries := make([]*wire.TimeSeries, 0, numRows)
for i := 0; i < numRows; i++ {
timeseries = append(timeseries, &wire.TimeSeries{
// TODO: attach StartTimestamp
StartTimestamp: &startTimestamp,
// TODO: labels?
Points: dataToPoints(data, i),
})
@ -135,21 +137,23 @@ func numRows(data telemetry.MetricData) int {
func dataToPoints(data telemetry.MetricData, i int) []*wire.Point {
switch d := data.(type) {
case *metric.Int64Data:
timestamp := convertTimestamp(*d.EndTime)
return []*wire.Point{
{
Value: wire.PointInt64Value{
Int64Value: d.Rows[i],
},
// TODO: attach Timestamp
Timestamp: &timestamp,
},
}
case *metric.Float64Data:
timestamp := convertTimestamp(*d.EndTime)
return []*wire.Point{
{
Value: wire.PointDoubleValue{
DoubleValue: d.Rows[i],
},
// TODO: attach Timestamp
Timestamp: &timestamp,
},
}
case *metric.HistogramInt64Data:
@ -158,10 +162,10 @@ func dataToPoints(data telemetry.MetricData, i int) []*wire.Point {
for i, val := range d.Info.Buckets {
bucketBounds[i] = float64(val)
}
return distributionToPoints(row.Values, row.Count, float64(row.Sum), bucketBounds)
return distributionToPoints(row.Values, row.Count, float64(row.Sum), bucketBounds, *d.EndTime)
case *metric.HistogramFloat64Data:
row := d.Rows[i]
return distributionToPoints(row.Values, row.Count, row.Sum, d.Info.Buckets)
return distributionToPoints(row.Values, row.Count, row.Sum, d.Info.Buckets, *d.EndTime)
}
return nil
@ -170,13 +174,14 @@ func dataToPoints(data telemetry.MetricData, i int) []*wire.Point {
// distributionToPoints returns an array of *wire.Points containing a
// wire.PointDistributionValue representing a distribution with the
// supplied counts, count, and sum.
func distributionToPoints(counts []int64, count int64, sum float64, bucketBounds []float64) []*wire.Point {
func distributionToPoints(counts []int64, count int64, sum float64, bucketBounds []float64, end time.Time) []*wire.Point {
buckets := make([]*wire.Bucket, len(counts))
for i := 0; i < len(counts); i++ {
buckets[i] = &wire.Bucket{
Count: counts[i],
}
}
timestamp := convertTimestamp(end)
return []*wire.Point{
{
Value: wire.PointDistributionValue{
@ -190,6 +195,7 @@ func distributionToPoints(counts []int64, count int64, sum float64, bucketBounds
},
},
},
Timestamp: &timestamp,
},
}
}

View File

@ -3,6 +3,7 @@ package ocagent
import (
"reflect"
"testing"
"time"
"golang.org/x/tools/internal/telemetry"
"golang.org/x/tools/internal/telemetry/export/ocagent/wire"
@ -318,14 +319,22 @@ func TestDataToMetricDescriptorType(t *testing.T) {
}
func TestDataToTimeseries(t *testing.T) {
epoch := time.Unix(0, 0)
epochTimestamp := convertTimestamp(epoch)
end := time.Unix(30, 0)
endTimestamp := convertTimestamp(end)
tests := []struct {
name string
data telemetry.MetricData
start time.Time
want []*wire.TimeSeries
}{
{
"nil data",
nil,
time.Time{},
nil,
},
{
@ -336,28 +345,36 @@ func TestDataToTimeseries(t *testing.T) {
2,
3,
},
EndTime: &end,
},
epoch,
[]*wire.TimeSeries{
&wire.TimeSeries{
Points: []*wire.Point{
&wire.Point{
Value: wire.PointInt64Value{Int64Value: 1},
Timestamp: &endTimestamp,
},
},
StartTimestamp: &epochTimestamp,
},
&wire.TimeSeries{
Points: []*wire.Point{
&wire.Point{
Value: wire.PointInt64Value{Int64Value: 2},
Timestamp: &endTimestamp,
},
},
StartTimestamp: &epochTimestamp,
},
&wire.TimeSeries{
Points: []*wire.Point{
&wire.Point{
Value: wire.PointInt64Value{Int64Value: 3},
Timestamp: &endTimestamp,
},
},
StartTimestamp: &epochTimestamp,
},
},
},
@ -368,21 +385,27 @@ func TestDataToTimeseries(t *testing.T) {
1.5,
4.5,
},
EndTime: &end,
},
epoch,
[]*wire.TimeSeries{
&wire.TimeSeries{
Points: []*wire.Point{
&wire.Point{
Value: wire.PointDoubleValue{DoubleValue: 1.5},
Timestamp: &endTimestamp,
},
},
StartTimestamp: &epochTimestamp,
},
&wire.TimeSeries{
Points: []*wire.Point{
&wire.Point{
Value: wire.PointDoubleValue{DoubleValue: 4.5},
Timestamp: &endTimestamp,
},
},
StartTimestamp: &epochTimestamp,
},
},
},
@ -405,7 +428,9 @@ func TestDataToTimeseries(t *testing.T) {
0, 5, 10,
},
},
EndTime: &end,
},
epoch,
[]*wire.TimeSeries{
&wire.TimeSeries{
Points: []*wire.Point{
@ -432,8 +457,10 @@ func TestDataToTimeseries(t *testing.T) {
},
},
},
Timestamp: &endTimestamp,
},
},
StartTimestamp: &epochTimestamp,
},
},
},
@ -455,7 +482,9 @@ func TestDataToTimeseries(t *testing.T) {
0, 5,
},
},
EndTime: &end,
},
epoch,
[]*wire.TimeSeries{
&wire.TimeSeries{
Points: []*wire.Point{
@ -479,8 +508,10 @@ func TestDataToTimeseries(t *testing.T) {
},
},
},
Timestamp: &endTimestamp,
},
},
StartTimestamp: &epochTimestamp,
},
},
},
@ -488,7 +519,7 @@ func TestDataToTimeseries(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := dataToTimeseries(tt.data)
got := dataToTimeseries(tt.data, tt.start)
if !reflect.DeepEqual(got, tt.want) {
t.Fatalf("Got:\n%s\nWant:\n%s", marshaled(got), marshaled(tt.want))
}
@ -559,11 +590,15 @@ func TestNumRows(t *testing.T) {
}
func TestDataToPoints(t *testing.T) {
end := time.Unix(30, 0)
endTimestamp := convertTimestamp(end)
int64Data := &metric.Int64Data{
Rows: []int64{
0,
10,
},
EndTime: &end,
}
float64Data := &metric.Float64Data{
@ -571,6 +606,7 @@ func TestDataToPoints(t *testing.T) {
0.5,
0.25,
},
EndTime: &end,
}
histogramInt64Data := &metric.HistogramInt64Data{
@ -599,6 +635,7 @@ func TestDataToPoints(t *testing.T) {
0, 5, 10,
},
},
EndTime: &end,
}
histogramFloat64Data := &metric.HistogramFloat64Data{
@ -627,6 +664,7 @@ func TestDataToPoints(t *testing.T) {
0, 5, 10,
},
},
EndTime: &end,
}
tests := []struct {
@ -650,6 +688,7 @@ func TestDataToPoints(t *testing.T) {
Value: wire.PointInt64Value{
Int64Value: 0,
},
Timestamp: &endTimestamp,
},
},
},
@ -662,6 +701,7 @@ func TestDataToPoints(t *testing.T) {
Value: wire.PointInt64Value{
Int64Value: 10,
},
Timestamp: &endTimestamp,
},
},
},
@ -674,6 +714,7 @@ func TestDataToPoints(t *testing.T) {
Value: wire.PointDoubleValue{
DoubleValue: 0.5,
},
Timestamp: &endTimestamp,
},
},
},
@ -686,6 +727,7 @@ func TestDataToPoints(t *testing.T) {
Value: wire.PointDoubleValue{
DoubleValue: 0.25,
},
Timestamp: &endTimestamp,
},
},
},
@ -717,6 +759,7 @@ func TestDataToPoints(t *testing.T) {
},
},
},
Timestamp: &endTimestamp,
},
},
},
@ -748,6 +791,7 @@ func TestDataToPoints(t *testing.T) {
},
},
},
Timestamp: &endTimestamp,
},
},
},
@ -779,6 +823,7 @@ func TestDataToPoints(t *testing.T) {
},
},
},
Timestamp: &endTimestamp,
},
},
},
@ -810,6 +855,7 @@ func TestDataToPoints(t *testing.T) {
},
},
},
Timestamp: &endTimestamp,
},
},
},
@ -826,12 +872,16 @@ func TestDataToPoints(t *testing.T) {
}
func TestDistributionToPoints(t *testing.T) {
end := time.Unix(30, 0)
endTimestamp := convertTimestamp(end)
tests := []struct {
name string
counts []int64
count int64
sum float64
buckets []float64
end time.Time
want []*wire.Point
}{
{
@ -846,6 +896,7 @@ func TestDistributionToPoints(t *testing.T) {
buckets: []float64{
0, 5, 10,
},
end: end,
want: []*wire.Point{
{
Value: wire.PointDistributionValue{
@ -871,6 +922,7 @@ func TestDistributionToPoints(t *testing.T) {
},
},
},
Timestamp: &endTimestamp,
},
},
},
@ -878,7 +930,7 @@ func TestDistributionToPoints(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := distributionToPoints(tt.counts, tt.count, tt.sum, tt.buckets)
got := distributionToPoints(tt.counts, tt.count, tt.sum, tt.buckets, tt.end)
if !reflect.DeepEqual(got, tt.want) {
t.Fatalf("Got:\n%s\nWant:\n%s", marshaled(got), marshaled(tt.want))
}

View File

@ -114,7 +114,7 @@ func (e *exporter) Log(context.Context, telemetry.Event) {}
func (e *exporter) Metric(ctx context.Context, data telemetry.MetricData) {
e.mu.Lock()
defer e.mu.Unlock()
e.metrics = append(e.metrics, convertMetric(data))
e.metrics = append(e.metrics, convertMetric(data, e.config.Start))
}
func (e *exporter) Flush() {
@ -202,9 +202,9 @@ func convertSpan(span *telemetry.Span) *wire.Span {
return result
}
func convertMetric(data telemetry.MetricData) *wire.Metric {
func convertMetric(data telemetry.MetricData, start time.Time) *wire.Metric {
descriptor := dataToMetricDescriptor(data)
timeseries := dataToTimeseries(data)
timeseries := dataToTimeseries(data, start)
if descriptor == nil && timeseries == nil {
return nil

View File

@ -131,6 +131,8 @@ type Int64Data struct {
IsGauge bool
// Rows holds the per group values for the metric.
Rows []int64
// End is the last time this metric was updated.
EndTime *time.Time
groups []telemetry.TagList
}
@ -143,6 +145,8 @@ type Float64Data struct {
IsGauge bool
// Rows holds the per group values for the metric.
Rows []float64
// End is the last time this metric was updated.
EndTime *time.Time
groups []telemetry.TagList
}
@ -153,6 +157,8 @@ type HistogramInt64Data struct {
Info *HistogramInt64
// Rows holds the per group values for the metric.
Rows []*HistogramInt64Row
// End is the last time this metric was updated.
EndTime *time.Time
groups []telemetry.TagList
}
@ -177,6 +183,8 @@ type HistogramFloat64Data struct {
Info *HistogramFloat64
// Rows holds the per group values for the metric.
Rows []*HistogramFloat64Row
// End is the last time this metric was updated.
EndTime *time.Time
groups []telemetry.TagList
}
@ -215,7 +223,7 @@ func getGroup(ctx context.Context, g *[]telemetry.TagList, keys []interface{}) (
func (data *Int64Data) Handle() string { return data.Info.Name }
func (data *Int64Data) Groups() []telemetry.TagList { return data.groups }
func (data *Int64Data) modify(ctx context.Context, f func(v int64) int64) {
func (data *Int64Data) modify(ctx context.Context, at time.Time, f func(v int64) int64) {
index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
old := data.Rows
if insert {
@ -227,34 +235,31 @@ func (data *Int64Data) modify(ctx context.Context, f func(v int64) int64) {
copy(data.Rows, old)
}
data.Rows[index] = f(data.Rows[index])
data.EndTime = &at
frozen := *data
export.Metric(ctx, &frozen)
}
func (data *Int64Data) countInt64(ctx context.Context, measure *stats.Int64Measure, value int64, at time.Time) {
// TODO: Use at.
data.modify(ctx, func(v int64) int64 { return v + 1 })
data.modify(ctx, at, func(v int64) int64 { return v + 1 })
}
func (data *Int64Data) countFloat64(ctx context.Context, measure *stats.Float64Measure, value float64, at time.Time) {
// TODO: Use at.
data.modify(ctx, func(v int64) int64 { return v + 1 })
data.modify(ctx, at, func(v int64) int64 { return v + 1 })
}
func (data *Int64Data) sum(ctx context.Context, measure *stats.Int64Measure, value int64, at time.Time) {
// TODO: Use at.
data.modify(ctx, func(v int64) int64 { return v + value })
data.modify(ctx, at, func(v int64) int64 { return v + value })
}
func (data *Int64Data) latest(ctx context.Context, measure *stats.Int64Measure, value int64, at time.Time) {
// TODO: Use at.
data.modify(ctx, func(v int64) int64 { return value })
data.modify(ctx, at, func(v int64) int64 { return value })
}
func (data *Float64Data) Handle() string { return data.Info.Name }
func (data *Float64Data) Groups() []telemetry.TagList { return data.groups }
func (data *Float64Data) modify(ctx context.Context, f func(v float64) float64) {
func (data *Float64Data) modify(ctx context.Context, at time.Time, f func(v float64) float64) {
index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
old := data.Rows
if insert {
@ -266,23 +271,23 @@ func (data *Float64Data) modify(ctx context.Context, f func(v float64) float64)
copy(data.Rows, old)
}
data.Rows[index] = f(data.Rows[index])
data.EndTime = &at
frozen := *data
export.Metric(ctx, &frozen)
}
func (data *Float64Data) sum(ctx context.Context, measure *stats.Float64Measure, value float64, at time.Time) {
data.modify(ctx, func(v float64) float64 { return v + value })
data.modify(ctx, at, func(v float64) float64 { return v + value })
}
func (data *Float64Data) latest(ctx context.Context, measure *stats.Float64Measure, value float64, at time.Time) {
// TODO: Use at.
data.modify(ctx, func(v float64) float64 { return value })
data.modify(ctx, at, func(v float64) float64 { return value })
}
func (data *HistogramInt64Data) Handle() string { return data.Info.Name }
func (data *HistogramInt64Data) Groups() []telemetry.TagList { return data.groups }
func (data *HistogramInt64Data) modify(ctx context.Context, f func(v *HistogramInt64Row)) {
func (data *HistogramInt64Data) modify(ctx context.Context, at time.Time, f func(v *HistogramInt64Row)) {
index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
old := data.Rows
var v HistogramInt64Row
@ -300,13 +305,13 @@ func (data *HistogramInt64Data) modify(ctx context.Context, f func(v *HistogramI
copy(v.Values, oldValues)
f(&v)
data.Rows[index] = &v
data.EndTime = &at
frozen := *data
export.Metric(ctx, &frozen)
}
func (data *HistogramInt64Data) record(ctx context.Context, measure *stats.Int64Measure, value int64, at time.Time) {
// TODO: Use at.
data.modify(ctx, func(v *HistogramInt64Row) {
data.modify(ctx, at, func(v *HistogramInt64Row) {
v.Sum += value
if v.Min > value || v.Count == 0 {
v.Min = value
@ -326,7 +331,7 @@ func (data *HistogramInt64Data) record(ctx context.Context, measure *stats.Int64
func (data *HistogramFloat64Data) Handle() string { return data.Info.Name }
func (data *HistogramFloat64Data) Groups() []telemetry.TagList { return data.groups }
func (data *HistogramFloat64Data) modify(ctx context.Context, f func(v *HistogramFloat64Row)) {
func (data *HistogramFloat64Data) modify(ctx context.Context, at time.Time, f func(v *HistogramFloat64Row)) {
index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
old := data.Rows
var v HistogramFloat64Row
@ -344,13 +349,13 @@ func (data *HistogramFloat64Data) modify(ctx context.Context, f func(v *Histogra
copy(v.Values, oldValues)
f(&v)
data.Rows[index] = &v
data.EndTime = &at
frozen := *data
export.Metric(ctx, &frozen)
}
func (data *HistogramFloat64Data) record(ctx context.Context, measure *stats.Float64Measure, value float64, at time.Time) {
// TODO: Use at.
data.modify(ctx, func(v *HistogramFloat64Row) {
data.modify(ctx, at, func(v *HistogramFloat64Row) {
v.Sum += value
if v.Min > value || v.Count == 0 {
v.Min = value