mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vmstorage: add -denyQueriesOutsideRetention
command-line flag for denying queries outside the configured retention
This commit is contained in:
parent
81e3d4305f
commit
4cb3e7595c
3 changed files with 56 additions and 8 deletions
|
@ -2,10 +2,13 @@ package netstorage
|
|||
|
||||
import (
|
||||
"container/heap"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -15,6 +18,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
|
@ -1147,7 +1151,8 @@ func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedC
|
|||
|
||||
if err := f(bc); err != nil {
|
||||
remoteAddr := bc.RemoteAddr()
|
||||
if _, ok := err.(*errRemote); ok {
|
||||
var er *errRemote
|
||||
if errors.As(err, &er) {
|
||||
// Remote error. The connection may be re-used. Return it to the pool.
|
||||
sn.connPool.Put(bc)
|
||||
} else {
|
||||
|
@ -1171,6 +1176,19 @@ func (er *errRemote) Error() string {
|
|||
return er.msg
|
||||
}
|
||||
|
||||
func newErrRemote(buf []byte) error {
|
||||
err := &errRemote{
|
||||
msg: string(buf),
|
||||
}
|
||||
if !strings.Contains(err.msg, "denyQueriesOutsideRetention") {
|
||||
return err
|
||||
}
|
||||
return &httpserver.ErrorWithStatusCode{
|
||||
Err: err,
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
}
|
||||
}
|
||||
|
||||
func (sn *storageNode) deleteMetricsOnConn(bc *handshake.BufferedConn, requestData []byte) (int, error) {
|
||||
// Send the request to sn
|
||||
if err := writeBytes(bc, requestData); err != nil {
|
||||
|
@ -1186,7 +1204,7 @@ func (sn *storageNode) deleteMetricsOnConn(bc *handshake.BufferedConn, requestDa
|
|||
return 0, fmt.Errorf("cannot read error message: %w", err)
|
||||
}
|
||||
if len(buf) > 0 {
|
||||
return 0, &errRemote{msg: string(buf)}
|
||||
return 0, newErrRemote(buf)
|
||||
}
|
||||
|
||||
// Read deletedCount
|
||||
|
@ -1217,7 +1235,7 @@ func (sn *storageNode) getLabelsOnConn(bc *handshake.BufferedConn, accountID, pr
|
|||
return nil, fmt.Errorf("cannot read error message: %w", err)
|
||||
}
|
||||
if len(buf) > 0 {
|
||||
return nil, &errRemote{msg: string(buf)}
|
||||
return nil, newErrRemote(buf)
|
||||
}
|
||||
|
||||
// Read response
|
||||
|
@ -1258,7 +1276,7 @@ func (sn *storageNode) getLabelValuesOnConn(bc *handshake.BufferedConn, accountI
|
|||
return nil, fmt.Errorf("cannot read error message: %w", err)
|
||||
}
|
||||
if len(buf) > 0 {
|
||||
return nil, &errRemote{msg: string(buf)}
|
||||
return nil, newErrRemote(buf)
|
||||
}
|
||||
|
||||
// Read response
|
||||
|
@ -1303,7 +1321,7 @@ func (sn *storageNode) getLabelEntriesOnConn(bc *handshake.BufferedConn, account
|
|||
return nil, fmt.Errorf("cannot read error message: %w", err)
|
||||
}
|
||||
if len(buf) > 0 {
|
||||
return nil, &errRemote{msg: string(buf)}
|
||||
return nil, newErrRemote(buf)
|
||||
}
|
||||
|
||||
// Read response
|
||||
|
@ -1356,7 +1374,7 @@ func (sn *storageNode) getTSDBStatusForDateOnConn(bc *handshake.BufferedConn, ac
|
|||
return nil, fmt.Errorf("cannot read error message: %w", err)
|
||||
}
|
||||
if len(buf) > 0 {
|
||||
return nil, &errRemote{msg: string(buf)}
|
||||
return nil, newErrRemote(buf)
|
||||
}
|
||||
|
||||
// Read response
|
||||
|
@ -1422,7 +1440,7 @@ func (sn *storageNode) getSeriesCountOnConn(bc *handshake.BufferedConn, accountI
|
|||
return 0, fmt.Errorf("cannot read error message: %w", err)
|
||||
}
|
||||
if len(buf) > 0 {
|
||||
return 0, &errRemote{msg: string(buf)}
|
||||
return 0, newErrRemote(buf)
|
||||
}
|
||||
|
||||
// Read response
|
||||
|
@ -1461,7 +1479,7 @@ func (sn *storageNode) processSearchQueryOnConn(tbfw *tmpBlocksFileWrapper, bc *
|
|||
return 0, fmt.Errorf("cannot read error message: %w", err)
|
||||
}
|
||||
if len(buf) > 0 {
|
||||
return 0, &errRemote{msg: string(buf)}
|
||||
return 0, newErrRemote(buf)
|
||||
}
|
||||
|
||||
// Read response. It may consist of multiple MetricBlocks.
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -14,6 +15,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
|
@ -27,6 +29,10 @@ var (
|
|||
|
||||
precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression at the cost of precision loss")
|
||||
disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage")
|
||||
|
||||
denyQueriesOutsideRetention = flag.Bool("denyQueriesOutsideRetention", false, "Whether to deny queries outside of the configured -retentionPeriod. "+
|
||||
"When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. "+
|
||||
"This may be useful when multiple data sources with distinct retentions are hidden behind query-tee")
|
||||
)
|
||||
|
||||
// Server processes connections from vminsert and vmselect.
|
||||
|
@ -824,6 +830,9 @@ func (s *Server) processVMSelectSearchQuery(ctx *vmselectRequestCtx) error {
|
|||
MinTimestamp: ctx.sq.MinTimestamp,
|
||||
MaxTimestamp: ctx.sq.MaxTimestamp,
|
||||
}
|
||||
if err := checkTimeRange(s.storage, tr); err != nil {
|
||||
return ctx.writeErrorMessage(err)
|
||||
}
|
||||
ctx.sr.Init(s.storage, ctx.tfss, tr, *maxMetricsPerSearch)
|
||||
defer ctx.sr.MustClose()
|
||||
if err := ctx.sr.Error(); err != nil {
|
||||
|
@ -859,6 +868,22 @@ func (s *Server) processVMSelectSearchQuery(ctx *vmselectRequestCtx) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// checkTimeRange returns true if the given tr is denied for querying.
|
||||
func checkTimeRange(s *storage.Storage, tr storage.TimeRange) error {
|
||||
if !*denyQueriesOutsideRetention {
|
||||
return nil
|
||||
}
|
||||
retentionPeriod := s.RetentionMonths()
|
||||
minAllowedTimestamp := (int64(fasttime.UnixTimestamp()) - int64(retentionPeriod)*3600*24*30) * 1000
|
||||
if tr.MinTimestamp > minAllowedTimestamp {
|
||||
return nil
|
||||
}
|
||||
return &httpserver.ErrorWithStatusCode{
|
||||
Err: fmt.Errorf("the given time range %s is outside the allowed retention of %d months according to -denyQueriesOutsideRetention", &tr, retentionPeriod),
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
vmselectDeleteMetricsRequests = metrics.NewCounter("vm_vmselect_delete_metrics_requests_total")
|
||||
vmselectLabelsRequests = metrics.NewCounter("vm_vmselect_labels_requests_total")
|
||||
|
|
|
@ -200,6 +200,11 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
|
|||
return s, nil
|
||||
}
|
||||
|
||||
// RetentionMonths returns retention months for s.
|
||||
func (s *Storage) RetentionMonths() int {
|
||||
return s.retentionMonths
|
||||
}
|
||||
|
||||
// debugFlush flushes recently added storage data, so it becomes visible to search.
|
||||
func (s *Storage) debugFlush() {
|
||||
s.tb.flushRawRows()
|
||||
|
|
Loading…
Reference in a new issue