mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmalert: respect batch size limit for remote write on shutdown (#6039)
During shutdown period of vmalert, remotewrite client retrieve all pending time series from buffer queue, compose them into 1 batch and execute remote write. This final batch may exceed the limit of -remoteWrite.maxBatchSize, and be rejected by the receiver (gateway, vmcluster or others). This changes ensures that even during shutdown vmalert won't exceed the max batch size limit for remote write destination. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6025
This commit is contained in:
parent
b6bd9a97a3
commit
623d257faf
3 changed files with 101 additions and 2 deletions
|
@ -151,12 +151,22 @@ func (c *Client) run(ctx context.Context) {
|
|||
ticker := time.NewTicker(c.flushInterval)
|
||||
wr := &prompbmarshal.WriteRequest{}
|
||||
shutdown := func() {
|
||||
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
|
||||
logger.Infof("shutting down remote write client and flushing remained series")
|
||||
|
||||
shutdownFlushCnt := 0
|
||||
for ts := range c.input {
|
||||
wr.Timeseries = append(wr.Timeseries, ts)
|
||||
if len(wr.Timeseries) >= c.maxBatchSize {
|
||||
shutdownFlushCnt += len(wr.Timeseries)
|
||||
c.flush(lastCtx, wr)
|
||||
}
|
||||
}
|
||||
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
|
||||
logger.Infof("shutting down remote write client and flushing remained %d series", len(wr.Timeseries))
|
||||
// flush the last batch. `flush` will re-check and avoid flushing empty batch.
|
||||
shutdownFlushCnt += len(wr.Timeseries)
|
||||
c.flush(lastCtx, wr)
|
||||
|
||||
logger.Infof("shutting down remote write client flushed %d series", shutdownFlushCnt)
|
||||
cancel()
|
||||
}
|
||||
c.wg.Add(1)
|
||||
|
|
|
@ -84,6 +84,70 @@ func TestClient_Push(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestClient_run_maxBatchSizeDuringShutdown(t *testing.T) {
|
||||
batchSize := 20
|
||||
|
||||
testTable := []struct {
|
||||
name string // name of the test case
|
||||
pushCnt int // how many time series is pushed to the client
|
||||
batchCnt int // the expected batch count sent by the client
|
||||
}{
|
||||
{
|
||||
name: "pushCnt % batchSize == 0",
|
||||
pushCnt: batchSize * 40,
|
||||
batchCnt: 40,
|
||||
},
|
||||
{
|
||||
name: "pushCnt % batchSize != 0",
|
||||
pushCnt: batchSize*40 + 1,
|
||||
batchCnt: 40 + 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range testTable {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// run new server
|
||||
bcServer := newBatchCntRWServer()
|
||||
|
||||
// run new client
|
||||
rwClient, err := NewClient(context.Background(), Config{
|
||||
MaxBatchSize: batchSize,
|
||||
|
||||
// Set everything to 1 to simplify the calculation.
|
||||
Concurrency: 1,
|
||||
MaxQueueSize: 1000,
|
||||
FlushInterval: time.Minute,
|
||||
|
||||
// batch count server
|
||||
Addr: bcServer.URL,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("new remote write client failed, err: %v", err)
|
||||
}
|
||||
|
||||
// push time series to the client.
|
||||
for i := 0; i < tt.pushCnt; i++ {
|
||||
if err = rwClient.Push(prompbmarshal.TimeSeries{}); err != nil {
|
||||
t.Fatalf("push time series to the client failed, err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// close the client so the rest ts will be flushed in `shutdown`
|
||||
if err = rwClient.Close(); err != nil {
|
||||
t.Fatalf("shutdown client failed, err: %v", err)
|
||||
}
|
||||
|
||||
// finally check how many batches is sent.
|
||||
if tt.batchCnt != bcServer.acceptedBatches() {
|
||||
t.Errorf("client sent batch count incorrect, want: %d, get: %d", tt.batchCnt, bcServer.acceptedBatches())
|
||||
}
|
||||
if tt.pushCnt != bcServer.accepted() {
|
||||
t.Errorf("client sent time series count incorrect, want: %d, get: %d", tt.pushCnt, bcServer.accepted())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newRWServer() *rwServer {
|
||||
rw := &rwServer{}
|
||||
rw.Server = httptest.NewServer(http.HandlerFunc(rw.handler))
|
||||
|
@ -184,3 +248,27 @@ func (frw *faultyRWServer) handler(w http.ResponseWriter, r *http.Request) {
|
|||
w.Write([]byte("server overloaded"))
|
||||
}
|
||||
}
|
||||
|
||||
type batchCntRWServer struct {
|
||||
*rwServer
|
||||
|
||||
batchCnt atomic.Int64 // accepted batch count, which also equals to request count
|
||||
}
|
||||
|
||||
func newBatchCntRWServer() *batchCntRWServer {
|
||||
bc := &batchCntRWServer{
|
||||
rwServer: &rwServer{},
|
||||
}
|
||||
|
||||
bc.Server = httptest.NewServer(http.HandlerFunc(bc.handler))
|
||||
return bc
|
||||
}
|
||||
|
||||
func (bc *batchCntRWServer) handler(w http.ResponseWriter, r *http.Request) {
|
||||
bc.batchCnt.Add(1)
|
||||
bc.rwServer.handler(w, r)
|
||||
}
|
||||
|
||||
func (bc *batchCntRWServer) acceptedBatches() int {
|
||||
return int(bc.batchCnt.Load())
|
||||
}
|
||||
|
|
|
@ -75,6 +75,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
|||
* BUGFIX: properly wait for force merge to be completed during the shutdown. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5944) for the details.
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): set correct `endsAt` value in notifications sent to the Alertmanager. Previously, a rule with evaluation intervals lower than 10s could never be triggered. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5995) for details.
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly account for `-rule.resendDelay` for alerting rules that are constantly switching state from inactive to firing. Before, notifications for such rules could have been skipped if state change happened more often than `-rule.resendDelay`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6028) for details.
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): respect `-remoteWrite.maxBatchSize` at shutdown period. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6025).
|
||||
|
||||
## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0)
|
||||
|
||||
|
|
Loading…
Reference in a new issue