vmalert: add new metric vmalert_remotewrite_flush_duration_seconds (#1622)

This commit is contained in:
Roman Khavronenko 2021-09-16 14:00:16 +03:00 committed by GitHub
parent f83fa31985
commit b75455c650
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"path"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -92,6 +93,10 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
if cfg.Transport == nil { if cfg.Transport == nil {
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone() cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
} }
cc := defaultConcurrency
if cfg.Concurrency > 0 {
cc = cfg.Concurrency
}
c := &Client{ c := &Client{
c: &http.Client{ c: &http.Client{
Timeout: cfg.WriteTimeout, Timeout: cfg.WriteTimeout,
@ -106,10 +111,7 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize), input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize),
disablePathAppend: cfg.DisablePathAppend, disablePathAppend: cfg.DisablePathAppend,
} }
cc := defaultConcurrency
if cfg.Concurrency > 0 {
cc = cfg.Concurrency
}
for i := 0; i < cc; i++ { for i := 0; i < cc; i++ {
c.run(ctx) c.run(ctx)
} }
@ -186,6 +188,7 @@ var (
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`) sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`) droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`) droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`)
bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`)
) )
// flush is a blocking function that marshals WriteRequest and sends // flush is a blocking function that marshals WriteRequest and sends
@ -196,6 +199,7 @@ func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
return return
} }
defer prompbmarshal.ResetWriteRequest(wr) defer prompbmarshal.ResetWriteRequest(wr)
defer bufferFlushDuration.UpdateDuration(time.Now())
data, err := wr.Marshal() data, err := wr.Marshal()
if err != nil { if err != nil {
@ -237,7 +241,7 @@ func (c *Client) send(ctx context.Context, data []byte) error {
} }
} }
if !c.disablePathAppend { if !c.disablePathAppend {
req.URL.Path += writePath req.URL.Path = path.Join(req.URL.Path, writePath)
} }
resp, err := c.c.Do(req.WithContext(ctx)) resp, err := c.c.Do(req.WithContext(ctx))
if err != nil { if err != nil {