From 4cb3e7595c597e127ac4a7ee76009ce425aed615 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 1 Jul 2020 00:58:26 +0300 Subject: [PATCH] app/vmstorage: add `-denyQueriesOutsideRetention` command-line flag for denying queries outside the configured retention --- app/vmselect/netstorage/netstorage.go | 34 ++++++++++++++++++++------- app/vmstorage/transport/server.go | 25 ++++++++++++++++++++ lib/storage/storage.go | 5 ++++ 3 files changed, 56 insertions(+), 8 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 22885b69e8..937628c8f2 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -2,10 +2,13 @@ package netstorage import ( "container/heap" + "errors" "fmt" "io" + "net/http" "runtime" "sort" + "strings" "sync" "time" @@ -15,6 +18,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" @@ -1147,7 +1151,8 @@ func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedC if err := f(bc); err != nil { remoteAddr := bc.RemoteAddr() - if _, ok := err.(*errRemote); ok { + var er *errRemote + if errors.As(err, &er) { // Remote error. The connection may be re-used. Return it to the pool. sn.connPool.Put(bc) } else { @@ -1171,6 +1176,19 @@ func (er *errRemote) Error() string { return er.msg } +func newErrRemote(buf []byte) error { + err := &errRemote{ + msg: string(buf), + } + if !strings.Contains(err.msg, "denyQueriesOutsideRetention") { + return err + } + return &httpserver.ErrorWithStatusCode{ + Err: err, + StatusCode: http.StatusServiceUnavailable, + } +} + func (sn *storageNode) deleteMetricsOnConn(bc *handshake.BufferedConn, requestData []byte) (int, error) { // Send the request to sn if err := writeBytes(bc, requestData); err != nil { @@ -1186,7 +1204,7 @@ func (sn *storageNode) deleteMetricsOnConn(bc *handshake.BufferedConn, requestDa return 0, fmt.Errorf("cannot read error message: %w", err) } if len(buf) > 0 { - return 0, &errRemote{msg: string(buf)} + return 0, newErrRemote(buf) } // Read deletedCount @@ -1217,7 +1235,7 @@ func (sn *storageNode) getLabelsOnConn(bc *handshake.BufferedConn, accountID, pr return nil, fmt.Errorf("cannot read error message: %w", err) } if len(buf) > 0 { - return nil, &errRemote{msg: string(buf)} + return nil, newErrRemote(buf) } // Read response @@ -1258,7 +1276,7 @@ func (sn *storageNode) getLabelValuesOnConn(bc *handshake.BufferedConn, accountI return nil, fmt.Errorf("cannot read error message: %w", err) } if len(buf) > 0 { - return nil, &errRemote{msg: string(buf)} + return nil, newErrRemote(buf) } // Read response @@ -1303,7 +1321,7 @@ func (sn *storageNode) getLabelEntriesOnConn(bc *handshake.BufferedConn, account return nil, fmt.Errorf("cannot read error message: %w", err) } if len(buf) > 0 { - return nil, &errRemote{msg: string(buf)} + return nil, newErrRemote(buf) } // Read response @@ -1356,7 +1374,7 @@ func (sn *storageNode) getTSDBStatusForDateOnConn(bc *handshake.BufferedConn, ac return nil, fmt.Errorf("cannot read error message: %w", err) } if len(buf) > 0 { - return nil, &errRemote{msg: string(buf)} + return nil, newErrRemote(buf) } // Read response @@ -1422,7 +1440,7 @@ func (sn *storageNode) getSeriesCountOnConn(bc *handshake.BufferedConn, accountI return 0, fmt.Errorf("cannot read error message: %w", err) } if len(buf) > 0 { - return 0, &errRemote{msg: string(buf)} + return 0, newErrRemote(buf) } // Read response @@ -1461,7 +1479,7 @@ func (sn *storageNode) processSearchQueryOnConn(tbfw *tmpBlocksFileWrapper, bc * return 0, fmt.Errorf("cannot read error message: %w", err) } if len(buf) > 0 { - return 0, &errRemote{msg: string(buf)} + return 0, newErrRemote(buf) } // Read response. It may consist of multiple MetricBlocks. diff --git a/app/vmstorage/transport/server.go b/app/vmstorage/transport/server.go index 7d4adfd26f..c12abea5d0 100644 --- a/app/vmstorage/transport/server.go +++ b/app/vmstorage/transport/server.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net" + "net/http" "sync" "sync/atomic" "time" @@ -14,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" @@ -27,6 +29,10 @@ var ( precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression at the cost of precision loss") disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage") + + denyQueriesOutsideRetention = flag.Bool("denyQueriesOutsideRetention", false, "Whether to deny queries outside of the configured -retentionPeriod. "+ + "When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. "+ + "This may be useful when multiple data sources with distinct retentions are hidden behind query-tee") ) // Server processes connections from vminsert and vmselect. @@ -824,6 +830,9 @@ func (s *Server) processVMSelectSearchQuery(ctx *vmselectRequestCtx) error { MinTimestamp: ctx.sq.MinTimestamp, MaxTimestamp: ctx.sq.MaxTimestamp, } + if err := checkTimeRange(s.storage, tr); err != nil { + return ctx.writeErrorMessage(err) + } ctx.sr.Init(s.storage, ctx.tfss, tr, *maxMetricsPerSearch) defer ctx.sr.MustClose() if err := ctx.sr.Error(); err != nil { @@ -859,6 +868,22 @@ func (s *Server) processVMSelectSearchQuery(ctx *vmselectRequestCtx) error { return nil } +// checkTimeRange returns true if the given tr is denied for querying. +func checkTimeRange(s *storage.Storage, tr storage.TimeRange) error { + if !*denyQueriesOutsideRetention { + return nil + } + retentionPeriod := s.RetentionMonths() + minAllowedTimestamp := (int64(fasttime.UnixTimestamp()) - int64(retentionPeriod)*3600*24*30) * 1000 + if tr.MinTimestamp > minAllowedTimestamp { + return nil + } + return &httpserver.ErrorWithStatusCode{ + Err: fmt.Errorf("the given time range %s is outside the allowed retention of %d months according to -denyQueriesOutsideRetention", &tr, retentionPeriod), + StatusCode: http.StatusServiceUnavailable, + } +} + var ( vmselectDeleteMetricsRequests = metrics.NewCounter("vm_vmselect_delete_metrics_requests_total") vmselectLabelsRequests = metrics.NewCounter("vm_vmselect_labels_requests_total") diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 0215ced6c4..9d33d890ff 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -200,6 +200,11 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { return s, nil } +// RetentionMonths returns retention months for s. +func (s *Storage) RetentionMonths() int { + return s.retentionMonths +} + // debugFlush flushes recently added storage data, so it becomes visible to search. func (s *Storage) debugFlush() { s.tb.flushRawRows()