mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
app/vmalert: follow-up bc37b279aa
* rm extra interface method for rw Client, as it has low applicability and doesn't fit multitenancy well * add `GetDroppedRows` method instead Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
parent
1e83598be3
commit
4ef76eed7b
4 changed files with 9 additions and 15 deletions
|
@ -148,11 +148,6 @@ func (c *Client) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CloseAndReport closes the client and report dropped rows
|
|
||||||
func (c *Client) CloseAndReport() (int64, error) {
|
|
||||||
return int64(droppedRows.Get()), c.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) run(ctx context.Context) {
|
func (c *Client) run(ctx context.Context) {
|
||||||
ticker := time.NewTicker(c.flushInterval)
|
ticker := time.NewTicker(c.flushInterval)
|
||||||
wr := &prompbmarshal.WriteRequest{}
|
wr := &prompbmarshal.WriteRequest{}
|
||||||
|
@ -217,6 +212,9 @@ var (
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// GetDroppedRows returns value of droppedRows metric
|
||||||
|
func GetDroppedRows() int64 { return int64(droppedRows.Get()) }
|
||||||
|
|
||||||
// flush is a blocking function that marshals WriteRequest and sends
|
// flush is a blocking function that marshals WriteRequest and sends
|
||||||
// it to remote-write endpoint. Flush performs limited amount of retries
|
// it to remote-write endpoint. Flush performs limited amount of retries
|
||||||
// if request fails.
|
// if request fails.
|
||||||
|
|
|
@ -60,11 +60,6 @@ func (c *DebugClient) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CloseAndReport closes the client and reports dropped rows
|
|
||||||
func (c *DebugClient) CloseAndReport() (int64, error) {
|
|
||||||
return 0, c.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *DebugClient) send(data []byte) error {
|
func (c *DebugClient) send(data []byte) error {
|
||||||
b := snappy.Encode(nil, data)
|
b := snappy.Encode(nil, data)
|
||||||
r := bytes.NewReader(b)
|
r := bytes.NewReader(b)
|
||||||
|
|
|
@ -10,6 +10,4 @@ type RWClient interface {
|
||||||
Push(s prompbmarshal.TimeSeries) error
|
Push(s prompbmarshal.TimeSeries) error
|
||||||
// Close stops the client. Client can't be reused after Close call.
|
// Close stops the client. Client can't be reused after Close call.
|
||||||
Close() error
|
Close() error
|
||||||
// CloseAndReport stops the client and reports dropped rows
|
|
||||||
CloseAndReport() (int64, error)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,9 +69,12 @@ func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw remotewri
|
||||||
total += ng.Replay(tFrom, tTo, rw, *replayMaxDatapoints, *replayRuleRetryAttempts, *replayRulesDelay, *disableProgressBar)
|
total += ng.Replay(tFrom, tTo, rw, *replayMaxDatapoints, *replayRuleRetryAttempts, *replayRulesDelay, *disableProgressBar)
|
||||||
}
|
}
|
||||||
logger.Infof("replay evaluation finished, generated %d samples", total)
|
logger.Infof("replay evaluation finished, generated %d samples", total)
|
||||||
droppedRows, err := rw.CloseAndReport()
|
if rw != nil {
|
||||||
if err == nil && droppedRows != 0 {
|
return rw.Close()
|
||||||
|
}
|
||||||
|
droppedRows := remotewrite.GetDroppedRows()
|
||||||
|
if droppedRows > 0 {
|
||||||
return fmt.Errorf("failed to push all generated samples to remote write url, dropped %d samples out of %d", droppedRows, total)
|
return fmt.Errorf("failed to push all generated samples to remote write url, dropped %d samples out of %d", droppedRows, total)
|
||||||
}
|
}
|
||||||
return err
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue