diff --git a/app/vmalert/remotewrite/remotewrite.go b/app/vmalert/remotewrite/remotewrite.go index eaac3a4bb..30d948224 100644 --- a/app/vmalert/remotewrite/remotewrite.go +++ b/app/vmalert/remotewrite/remotewrite.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "net/http" + "path" "strings" "sync" "time" @@ -92,6 +93,10 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) { if cfg.Transport == nil { cfg.Transport = http.DefaultTransport.(*http.Transport).Clone() } + cc := defaultConcurrency + if cfg.Concurrency > 0 { + cc = cfg.Concurrency + } c := &Client{ c: &http.Client{ Timeout: cfg.WriteTimeout, @@ -106,10 +111,7 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) { input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize), disablePathAppend: cfg.DisablePathAppend, } - cc := defaultConcurrency - if cfg.Concurrency > 0 { - cc = cfg.Concurrency - } + for i := 0; i < cc; i++ { c.run(ctx) } @@ -182,10 +184,11 @@ func (c *Client) run(ctx context.Context) { } var ( - sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`) - sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`) - droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`) - droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`) + sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`) + sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`) + droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`) + droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`) + bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`) ) // flush is a blocking function that marshals WriteRequest and sends @@ -196,6 +199,7 @@ func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) { return } defer prompbmarshal.ResetWriteRequest(wr) + defer bufferFlushDuration.UpdateDuration(time.Now()) data, err := wr.Marshal() if err != nil { @@ -237,7 +241,7 @@ func (c *Client) send(ctx context.Context, data []byte) error { } } if !c.disablePathAppend { - req.URL.Path += writePath + req.URL.Path = path.Join(req.URL.Path, writePath) } resp, err := c.c.Do(req.WithContext(ctx)) if err != nil {