diff --git a/app/vmalert/group.go b/app/vmalert/group.go index ec65ea508..cccd0f460 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -144,7 +144,6 @@ var ( alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`) alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`) - remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`) remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`) ) @@ -255,7 +254,6 @@ func (e *executor) exec(ctx context.Context, rule Rule, returnSeries bool, inter } if len(tss) > 0 && e.rw != nil { - remoteWriteSent.Add(len(tss)) for _, ts := range tss { if err := e.rw.Push(ts); err != nil { remoteWriteErrors.Inc() diff --git a/app/vmalert/remotewrite/remotewrite.go b/app/vmalert/remotewrite/remotewrite.go index f508fded7..caff50a5c 100644 --- a/app/vmalert/remotewrite/remotewrite.go +++ b/app/vmalert/remotewrite/remotewrite.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/metrics" "github.com/golang/snappy" ) @@ -61,7 +62,7 @@ const ( defaultConcurrency = 4 defaultMaxBatchSize = 1e3 defaultMaxQueueSize = 1e5 - defaultFlushInterval = time.Second + defaultFlushInterval = 5 * time.Second defaultWriteTimeout = 30 * time.Second ) @@ -85,6 +86,9 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) { if cfg.WriteTimeout == 0 { cfg.WriteTimeout = defaultWriteTimeout } + if cfg.Transport == nil { + cfg.Transport = http.DefaultTransport.(*http.Transport).Clone() + } c := &Client{ c: &http.Client{ Timeout: cfg.WriteTimeout, @@ -138,14 +142,11 @@ func (c *Client) Close() error { func (c *Client) run(ctx context.Context) { ticker := time.NewTicker(c.flushInterval) - wr := prompbmarshal.WriteRequest{} + wr := &prompbmarshal.WriteRequest{} shutdown := func() { for ts := range c.input { wr.Timeseries = append(wr.Timeseries, ts) } - if len(wr.Timeseries) < 1 { - return - } lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout) c.flush(lastCtx, wr) cancel() @@ -164,44 +165,82 @@ func (c *Client) run(ctx context.Context) { return case <-ticker.C: c.flush(ctx, wr) - wr = prompbmarshal.WriteRequest{} - case ts := <-c.input: + case ts, ok := <-c.input: + if !ok { + continue + } wr.Timeseries = append(wr.Timeseries, ts) if len(wr.Timeseries) >= c.maxBatchSize { c.flush(ctx, wr) - wr = prompbmarshal.WriteRequest{} } } } }() } -func (c *Client) flush(ctx context.Context, wr prompbmarshal.WriteRequest) { +var ( + sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`) + sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`) + droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`) + droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`) +) + +// flush is a blocking function that marshals WriteRequest and sends +// it to remote write endpoint. Flush performs limited amount of retries +// if request fails. +func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) { if len(wr.Timeseries) < 1 { return } + defer prompbmarshal.ResetWriteRequest(wr) + data, err := wr.Marshal() if err != nil { logger.Errorf("failed to marshal WriteRequest: %s", err) return } - req, err := http.NewRequest("POST", c.addr, bytes.NewReader(snappy.Encode(nil, data))) + + const attempts = 5 + b := snappy.Encode(nil, data) + for i := 0; i < attempts; i++ { + err := c.send(ctx, b) + if err == nil { + sentRows.Add(len(wr.Timeseries)) + sentBytes.Add(len(b)) + return + } + + logger.Errorf("attempt %d to send request failed: %s", i+1, err) + // sleeping to avoid remote db hammering + time.Sleep(time.Second) + continue + } + + droppedRows.Add(len(wr.Timeseries)) + droppedBytes.Add(len(b)) + logger.Errorf("all %d attempts to send request failed - dropping %d timeseries", + attempts, len(wr.Timeseries)) +} + +func (c *Client) send(ctx context.Context, data []byte) error { + r := bytes.NewReader(data) + req, err := http.NewRequest("POST", c.addr, r) if err != nil { - logger.Errorf("failed to create new HTTP request: %s", err) - return + return fmt.Errorf("failed to create new HTTP request: %s", err) } if c.baPass != "" { req.SetBasicAuth(c.baUser, c.baPass) } resp, err := c.c.Do(req.WithContext(ctx)) if err != nil { - logger.Errorf("error getting response from %s:%s", req.URL, err) - return + return fmt.Errorf("error while sending request to %s: %s; Data len %d(%d)", + req.URL, err, len(data), r.Size()) } defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusNoContent { body, _ := ioutil.ReadAll(resp.Body) - logger.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, req.URL, body) - return + return fmt.Errorf("unexpected response code %d for %s. Response body %q", + resp.StatusCode, req.URL, body) } + return nil } diff --git a/app/vmalert/remotewrite/remotewrite_test.go b/app/vmalert/remotewrite/remotewrite_test.go new file mode 100644 index 000000000..7dab24712 --- /dev/null +++ b/app/vmalert/remotewrite/remotewrite_test.go @@ -0,0 +1,102 @@ +package remotewrite + +import ( + "context" + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + "github.com/golang/snappy" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func TestClient_Push(t *testing.T) { + testSrv := newRWServer() + cfg := Config{ + Addr: testSrv.URL, + MaxBatchSize: 100, + } + client, err := NewClient(context.Background(), cfg) + if err != nil { + t.Fatalf("failed to create client: %s", err) + } + const rowsN = 1e4 + var sent int + for i := 0; i < rowsN; i++ { + s := prompbmarshal.TimeSeries{ + Samples: []prompbmarshal.Sample{{ + Value: rand.Float64(), + Timestamp: time.Now().Unix(), + }}, + } + err := client.Push(s) + if err == nil { + sent++ + } + } + if sent == 0 { + t.Fatalf("0 series sent") + } + if err := client.Close(); err != nil { + t.Fatalf("failed to close client: %s", err) + } + got := testSrv.accepted() + if got != sent { + t.Fatalf("expected to have %d series; got %d", sent, got) + } +} + +func newRWServer() *rwServer { + rw := &rwServer{} + rw.Server = httptest.NewServer(http.HandlerFunc(rw.handler)) + return rw +} + +type rwServer struct { + // WARN: ordering of fields is important for alignment! + // see https://golang.org/pkg/sync/atomic/#pkg-note-BUG + acceptedRows uint64 + *httptest.Server +} + +func (rw *rwServer) accepted() int { + return int(atomic.LoadUint64(&rw.acceptedRows)) +} + +func (rw *rwServer) err(w http.ResponseWriter, err error) { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) +} + +func (rw *rwServer) handler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + rw.err(w, fmt.Errorf("bad method %q", r.Method)) + return + } + data, err := ioutil.ReadAll(r.Body) + if err != nil { + rw.err(w, fmt.Errorf("body read err: %s", err)) + return + } + defer func() { _ = r.Body.Close() }() + + b, err := snappy.Decode(nil, data) + if err != nil { + rw.err(w, fmt.Errorf("decode err: %s", err)) + return + } + wr := &prompb.WriteRequest{} + if err := wr.Unmarshal(b); err != nil { + rw.err(w, fmt.Errorf("unmarhsal err: %s", err)) + return + } + atomic.AddUint64(&rw.acceptedRows, uint64(len(wr.Timeseries))) + w.WriteHeader(http.StatusNoContent) +}