1
0
mirror of https://github.com/golang/go synced 2024-10-01 01:38:33 -06:00

internal/telemetry: pass the http.Client to the ocagent

This will allow us to configure the connection at need.
It will also allow us to intercept the content for tests.

Change-Id: Id7d34f2d56f233eae112bea97cccab1f2a88de55
Reviewed-on: https://go-review.googlesource.com/c/tools/+/190798
Run-TryBot: Ian Cottrell <iancottrell@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Rebecca Stambler <rstambler@golang.org>
Reviewed-by: Emmanuel Odeke <emm.odeke@gmail.com>
This commit is contained in:
Ian Cottrell 2019-08-15 22:43:07 -04:00 committed by Ian Cottrell
parent 547ecf7b1e
commit fa80cb575d
2 changed files with 67 additions and 32 deletions

View File

@ -113,7 +113,10 @@ gopls flags are:
// If no arguments are passed it will invoke the server sub command, as a
// temporary measure for compatibility.
func (app *Application) Run(ctx context.Context, args ...string) error {
export.AddExporters(ocagent.Connect(app.name, app.OCAgent))
ocConfig := ocagent.Discover()
//TODO: we should not need to adjust the discovered configuration
ocConfig.Address = app.OCAgent
export.AddExporters(ocagent.Connect(ocConfig))
app.Serve.app = app
if len(args) == 0 {
tool.Main(ctx, &app.Serve, args)

View File

@ -14,6 +14,7 @@ import (
"fmt"
"net/http"
"os"
"path/filepath"
"sync"
"time"
@ -23,12 +24,28 @@ import (
"golang.org/x/tools/internal/telemetry/tag"
)
const DefaultAddress = "http://localhost:55678"
const exportRate = 2 * time.Second
type Config struct {
Start time.Time
Host string
Process uint32
Client *http.Client
Service string
Address string
Rate time.Duration
}
// Discover finds the local agent to export to, it will return nil if there
// is not one running.
// TODO: Actually implement a discovery protocol rather than a hard coded address
func Discover() *Config {
return &Config{
Address: "http://localhost:55678",
}
}
type exporter struct {
mu sync.Mutex
address string
config Config
node *wire.Node
spans []*wire.Span
metrics []*wire.Metric
@ -37,35 +54,48 @@ type exporter struct {
// Connect creates a process specific exporter with the specified
// serviceName and the address of the ocagent to which it will upload
// its telemetry.
func Connect(service, address string) export.Exporter {
if address == "off" {
func Connect(config *Config) export.Exporter {
if config == nil || config.Address == "off" {
return nil
}
hostname, _ := os.Hostname()
exporter := &exporter{
address: address,
node: &wire.Node{
Identifier: &wire.ProcessIdentifier{
HostName: hostname,
Pid: uint32(os.Getpid()),
StartTimestamp: convertTimestamp(time.Now()),
},
LibraryInfo: &wire.LibraryInfo{
Language: wire.LanguageGo,
ExporterVersion: "0.0.1",
CoreLibraryVersion: "x/tools",
},
ServiceInfo: &wire.ServiceInfo{
Name: service,
},
exporter := &exporter{config: *config}
if exporter.config.Start.IsZero() {
exporter.config.Start = time.Now()
}
if exporter.config.Host == "" {
hostname, _ := os.Hostname()
exporter.config.Host = hostname
}
if exporter.config.Process == 0 {
exporter.config.Process = uint32(os.Getpid())
}
if exporter.config.Client == nil {
exporter.config.Client = http.DefaultClient
}
if exporter.config.Service == "" {
exporter.config.Service = filepath.Base(os.Args[0])
}
if exporter.config.Rate == 0 {
exporter.config.Rate = 2 * time.Second
}
exporter.node = &wire.Node{
Identifier: &wire.ProcessIdentifier{
HostName: exporter.config.Host,
Pid: exporter.config.Process,
StartTimestamp: convertTimestamp(exporter.config.Start),
},
LibraryInfo: &wire.LibraryInfo{
Language: wire.LanguageGo,
ExporterVersion: "0.0.1",
CoreLibraryVersion: "x/tools",
},
ServiceInfo: &wire.ServiceInfo{
Name: exporter.config.Service,
},
}
if exporter.address == "" {
exporter.address = DefaultAddress
}
go func() {
for _ = range time.Tick(exportRate) {
exporter.flush()
for _ = range time.Tick(exporter.config.Rate) {
exporter.Flush()
}
}()
return exporter
@ -87,7 +117,7 @@ func (e *exporter) Metric(ctx context.Context, data telemetry.MetricData) {
e.metrics = append(e.metrics, convertMetric(data))
}
func (e *exporter) flush() {
func (e *exporter) Flush() {
e.mu.Lock()
defer e.mu.Unlock()
spans := e.spans
@ -117,19 +147,21 @@ func (e *exporter) send(endpoint string, message interface{}) {
errorInExport("ocagent failed to marshal message for %v: %v", endpoint, err)
return
}
uri := e.address + endpoint
uri := e.config.Address + endpoint
req, err := http.NewRequest("POST", uri, bytes.NewReader(blob))
if err != nil {
errorInExport("ocagent failed to build request for %v: %v", uri, err)
return
}
req.Header.Set("Content-Type", "application/json")
res, err := http.DefaultClient.Do(req)
res, err := e.config.Client.Do(req)
if err != nil {
errorInExport("ocagent failed to send message: %v \n", err)
return
}
res.Body.Close()
if res.Body != nil {
res.Body.Close()
}
return
}