app/vmselect/netstorage: remove unused auth.Token arg

This commit is contained in:
Aliaksandr Valialkin 2022-07-06 00:11:59 +03:00
parent 78eeca6f0d
commit f4df43f7cc
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
5 changed files with 51 additions and 59 deletions

View file

@ -206,7 +206,7 @@ func MetricsIndexHandler(startTime time.Time, at *auth.Token, w http.ResponseWri
jsonp := r.FormValue("jsonp")
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, 0, nil, 0)
metricNames, isPartial, err := netstorage.LabelValues(nil, at, denyPartialResponse, "__name__", sq, 0, deadline)
metricNames, isPartial, err := netstorage.LabelValues(nil, denyPartialResponse, "__name__", sq, 0, deadline)
if err != nil {
return fmt.Errorf(`cannot obtain metric names: %w`, err)
}
@ -228,7 +228,7 @@ func metricsFind(at *auth.Token, denyPartialResponse bool, tr storage.TimeRange,
n := strings.IndexAny(qTail, "*{[")
if n < 0 {
query := qHead + qTail
suffixes, isPartial, err := netstorage.TagValueSuffixes(nil, at, denyPartialResponse, tr, label, query, delimiter, deadline)
suffixes, isPartial, err := netstorage.TagValueSuffixes(nil, at.AccountID, at.ProjectID, denyPartialResponse, tr, label, query, delimiter, deadline)
if err != nil {
return nil, false, err
}
@ -248,7 +248,7 @@ func metricsFind(at *auth.Token, denyPartialResponse bool, tr storage.TimeRange,
}
if n == len(qTail)-1 && strings.HasSuffix(qTail, "*") {
query := qHead + qTail[:len(qTail)-1]
suffixes, isPartial, err := netstorage.TagValueSuffixes(nil, at, denyPartialResponse, tr, label, query, delimiter, deadline)
suffixes, isPartial, err := netstorage.TagValueSuffixes(nil, at.AccountID, at.ProjectID, denyPartialResponse, tr, label, query, delimiter, deadline)
if err != nil {
return nil, false, err
}

View file

@ -51,7 +51,7 @@ func TagsDelSeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWr
}
tfss := joinTagFilterss(tfs, etfs)
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, ct, tfss, 0)
n, err := netstorage.DeleteSeries(nil, at, sq, deadline)
n, err := netstorage.DeleteSeries(nil, sq, deadline)
if err != nil {
return fmt.Errorf("cannot delete series for %q: %w", sq, err)
}
@ -128,7 +128,7 @@ func registerMetrics(startTime time.Time, at *auth.Token, w http.ResponseWriter,
mr.MetricNameRaw = storage.MarshalMetricNameRaw(mr.MetricNameRaw[:0], at.AccountID, at.ProjectID, labels)
mr.Timestamp = ct
}
if err := netstorage.RegisterMetricNames(nil, at, mrs, deadline); err != nil {
if err := netstorage.RegisterMetricNames(nil, mrs, deadline); err != nil {
return fmt.Errorf("cannot register paths: %w", err)
}
@ -183,7 +183,7 @@ func TagsAutoCompleteValuesHandler(startTime time.Time, at *auth.Token, w http.R
// Escape special chars in tagPrefix as Graphite does.
// See https://github.com/graphite-project/graphite-web/blob/3ad279df5cb90b211953e39161df416e54a84948/webapp/graphite/tags/base.py#L228
filter := regexp.QuoteMeta(valuePrefix)
tagValues, isPartial, err = netstorage.GraphiteTagValues(nil, at, denyPartialResponse, tag, filter, limit, deadline)
tagValues, isPartial, err = netstorage.GraphiteTagValues(nil, at.AccountID, at.ProjectID, denyPartialResponse, tag, filter, limit, deadline)
if err != nil {
return err
}
@ -193,7 +193,7 @@ func TagsAutoCompleteValuesHandler(startTime time.Time, at *auth.Token, w http.R
if err != nil {
return err
}
metricNames, isPartialResponse, err := netstorage.SearchMetricNames(nil, at, denyPartialResponse, sq, deadline)
metricNames, isPartialResponse, err := netstorage.SearchMetricNames(nil, denyPartialResponse, sq, deadline)
if err != nil {
return fmt.Errorf("cannot fetch metric names for %q: %w", sq, err)
}
@ -273,7 +273,7 @@ func TagsAutoCompleteTagsHandler(startTime time.Time, at *auth.Token, w http.Res
// Escape special chars in tagPrefix as Graphite does.
// See https://github.com/graphite-project/graphite-web/blob/3ad279df5cb90b211953e39161df416e54a84948/webapp/graphite/tags/base.py#L181
filter := regexp.QuoteMeta(tagPrefix)
labels, isPartial, err = netstorage.GraphiteTags(nil, at, denyPartialResponse, filter, limit, deadline)
labels, isPartial, err = netstorage.GraphiteTags(nil, at.AccountID, at.ProjectID, denyPartialResponse, filter, limit, deadline)
if err != nil {
return err
}
@ -283,7 +283,7 @@ func TagsAutoCompleteTagsHandler(startTime time.Time, at *auth.Token, w http.Res
if err != nil {
return err
}
metricNames, isPartialResponse, err := netstorage.SearchMetricNames(nil, at, denyPartialResponse, sq, deadline)
metricNames, isPartialResponse, err := netstorage.SearchMetricNames(nil, denyPartialResponse, sq, deadline)
if err != nil {
return fmt.Errorf("cannot fetch metric names for %q: %w", sq, err)
}
@ -353,7 +353,7 @@ func TagsFindSeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseW
return err
}
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
metricNames, isPartial, err := netstorage.SearchMetricNames(nil, at, denyPartialResponse, sq, deadline)
metricNames, isPartial, err := netstorage.SearchMetricNames(nil, denyPartialResponse, sq, deadline)
if err != nil {
return fmt.Errorf("cannot fetch metric names for %q: %w", sq, err)
}
@ -418,7 +418,7 @@ func TagValuesHandler(startTime time.Time, at *auth.Token, tagName string, w htt
}
filter := r.FormValue("filter")
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
tagValues, isPartial, err := netstorage.GraphiteTagValues(nil, at, denyPartialResponse, tagName, filter, limit, deadline)
tagValues, isPartial, err := netstorage.GraphiteTagValues(nil, at.AccountID, at.ProjectID, denyPartialResponse, tagName, filter, limit, deadline)
if err != nil {
return err
}
@ -447,7 +447,7 @@ func TagsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *
}
filter := r.FormValue("filter")
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
labels, isPartial, err := netstorage.GraphiteTags(nil, at, denyPartialResponse, filter, limit, deadline)
labels, isPartial, err := netstorage.GraphiteTags(nil, at.AccountID, at.ProjectID, denyPartialResponse, filter, limit, deadline)
if err != nil {
return err
}

View file

@ -16,7 +16,6 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@ -61,7 +60,6 @@ func (r *Result) reset() {
// Results holds results returned from ProcessSearchQuery.
type Results struct {
at *auth.Token
tr storage.TimeRange
deadline searchutils.Deadline
@ -150,7 +148,7 @@ func (tsw *timeseriesWork) do(r *Result, workerID uint) error {
atomic.StoreUint32(tsw.mustStop, 1)
return fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
}
if err := tsw.pts.Unpack(r, rss.tbf, rss.tr, rss.at); err != nil {
if err := tsw.pts.Unpack(r, rss.tbf, rss.tr); err != nil {
atomic.StoreUint32(tsw.mustStop, 1)
return fmt.Errorf("error during time series unpacking: %w", err)
}
@ -290,7 +288,6 @@ type unpackWorkItem struct {
type unpackWork struct {
ws []unpackWorkItem
tbf *tmpBlocksFile
at *auth.Token
sbs []*sortBlock
doneCh chan error
}
@ -304,7 +301,6 @@ func (upw *unpackWork) reset() {
}
upw.ws = upw.ws[:0]
upw.tbf = nil
upw.at = nil
sbs := upw.sbs
for i := range sbs {
sbs[i] = nil
@ -318,7 +314,7 @@ func (upw *unpackWork) reset() {
func (upw *unpackWork) unpack(tmpBlock *storage.Block) {
for _, w := range upw.ws {
sb := getSortBlock()
if err := sb.unpackFrom(tmpBlock, upw.tbf, w.addr, w.tr, upw.at); err != nil {
if err := sb.unpackFrom(tmpBlock, upw.tbf, w.addr, w.tr); err != nil {
putSortBlock(sb)
upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err)
return
@ -390,7 +386,7 @@ var tmpBlockPool sync.Pool
var unpackBatchSize = 5000
// Unpack unpacks pts to dst.
func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange, at *auth.Token) error {
func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange) error {
dst.reset()
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err)
@ -426,14 +422,12 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
upws := make([]*unpackWork, 0, 1+addrsLen/unpackBatchSize)
upw := getUnpackWork()
upw.tbf = tbf
upw.at = at
for _, addr := range pts.addrs {
if len(upw.ws) >= unpackBatchSize {
scheduleUnpackWork(workChs, upw)
upws = append(upws, upw)
upw = getUnpackWork()
upw.tbf = tbf
upw.at = at
}
upw.ws = append(upw.ws, unpackWorkItem{
addr: addr,
@ -567,7 +561,7 @@ func (sb *sortBlock) reset() {
sb.NextIdx = 0
}
func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange, at *auth.Token) error {
func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange) error {
tmpBlock.Reset()
tbf.MustReadBlockAt(tmpBlock, addr)
if err := tmpBlock.UnmarshalData(); err != nil {
@ -607,7 +601,7 @@ func (sbh *sortBlocksHeap) Pop() interface{} {
}
// RegisterMetricNames registers metric names from mrs in the storage.
func RegisterMetricNames(qt *querytracer.Tracer, at *auth.Token, mrs []storage.MetricRow, deadline searchutils.Deadline) error {
func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline searchutils.Deadline) error {
qt = qt.NewChild("register metric names")
defer qt.Done()
// Split mrs among available vmstorage nodes.
@ -645,7 +639,7 @@ func RegisterMetricNames(qt *querytracer.Tracer, at *auth.Token, mrs []storage.M
}
// DeleteSeries deletes time series matching the given sq.
func DeleteSeries(qt *querytracer.Tracer, at *auth.Token, sq *storage.SearchQuery, deadline searchutils.Deadline) (int, error) {
func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline) (int, error) {
qt = qt.NewChild("delete series: %s", sq)
defer qt.Done()
requestData := sq.Marshal(nil)
@ -684,7 +678,7 @@ func DeleteSeries(qt *querytracer.Tracer, at *auth.Token, sq *storage.SearchQuer
}
// LabelNames returns label names matching the given sq until the given deadline.
func LabelNames(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, sq *storage.SearchQuery, maxLabelNames int, deadline searchutils.Deadline) ([]string, bool, error) {
func LabelNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, maxLabelNames int, deadline searchutils.Deadline) ([]string, bool, error) {
qt = qt.NewChild("get labels: %s", sq)
defer qt.Done()
if deadline.Exceeded() {
@ -737,14 +731,14 @@ func LabelNames(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool
}
// GraphiteTags returns Graphite tags until the given deadline.
func GraphiteTags(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, filter string, limit int, deadline searchutils.Deadline) ([]string, bool, error) {
func GraphiteTags(qt *querytracer.Tracer, accountID, projectID uint32, denyPartialResponse bool, filter string, limit int, deadline searchutils.Deadline) ([]string, bool, error) {
qt = qt.NewChild("get graphite tags: filter=%s, limit=%d", filter, limit)
defer qt.Done()
if deadline.Exceeded() {
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, 0, nil, 0)
labels, isPartial, err := LabelNames(qt, at, denyPartialResponse, sq, 0, deadline)
sq := storage.NewSearchQuery(accountID, projectID, 0, 0, nil, 0)
labels, isPartial, err := LabelNames(qt, denyPartialResponse, sq, 0, deadline)
if err != nil {
return nil, false, err
}
@ -785,8 +779,7 @@ func hasString(a []string, s string) bool {
}
// LabelValues returns label values matching the given labelName and sq until the given deadline.
func LabelValues(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, labelName string, sq *storage.SearchQuery,
maxLabelValues int, deadline searchutils.Deadline) ([]string, bool, error) {
func LabelValues(qt *querytracer.Tracer, denyPartialResponse bool, labelName string, sq *storage.SearchQuery, maxLabelValues int, deadline searchutils.Deadline) ([]string, bool, error) {
qt = qt.NewChild("get values for label %s: %s", labelName, sq)
defer qt.Done()
if deadline.Exceeded() {
@ -840,7 +833,7 @@ func LabelValues(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse boo
}
// GraphiteTagValues returns tag values for the given tagName until the given deadline.
func GraphiteTagValues(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, tagName, filter string, limit int, deadline searchutils.Deadline) ([]string, bool, error) {
func GraphiteTagValues(qt *querytracer.Tracer, accountID, projectID uint32, denyPartialResponse bool, tagName, filter string, limit int, deadline searchutils.Deadline) ([]string, bool, error) {
qt = qt.NewChild("get graphite tag values for tagName=%s, filter=%s, limit=%d", tagName, filter, limit)
defer qt.Done()
if deadline.Exceeded() {
@ -849,8 +842,8 @@ func GraphiteTagValues(qt *querytracer.Tracer, at *auth.Token, denyPartialRespon
if tagName == "name" {
tagName = ""
}
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, 0, nil, 0)
tagValues, isPartial, err := LabelValues(qt, at, denyPartialResponse, tagName, sq, 0, deadline)
sq := storage.NewSearchQuery(accountID, projectID, 0, 0, nil, 0)
tagValues, isPartial, err := LabelValues(qt, denyPartialResponse, tagName, sq, 0, deadline)
if err != nil {
return nil, false, err
}
@ -869,7 +862,7 @@ func GraphiteTagValues(qt *querytracer.Tracer, at *auth.Token, denyPartialRespon
// TagValueSuffixes returns tag value suffixes for the given tagKey and the given tagValuePrefix.
//
// It can be used for implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
func TagValueSuffixes(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, tr storage.TimeRange, tagKey, tagValuePrefix string,
func TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, denyPartialResponse bool, tr storage.TimeRange, tagKey, tagValuePrefix string,
delimiter byte, deadline searchutils.Deadline) ([]string, bool, error) {
qt = qt.NewChild("get tag value suffixes for tagKey=%s, tagValuePrefix=%s, timeRange=%s", tagKey, tagValuePrefix, &tr)
defer qt.Done()
@ -883,7 +876,7 @@ func TagValueSuffixes(qt *querytracer.Tracer, at *auth.Token, denyPartialRespons
}
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
sn.tagValueSuffixesRequests.Inc()
suffixes, err := sn.getTagValueSuffixes(qt, at.AccountID, at.ProjectID, tr, tagKey, tagValuePrefix, delimiter, deadline)
suffixes, err := sn.getTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, deadline)
if err != nil {
sn.tagValueSuffixesErrors.Inc()
err = fmt.Errorf("cannot get tag value suffixes for tr=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c from vmstorage %s: %w",
@ -933,7 +926,7 @@ func deduplicateStrings(a []string) []string {
// TSDBStatus returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
//
// It accepts aribtrary filters on time series in sq.
func TSDBStatus(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, sq *storage.SearchQuery, focusLabel string, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, bool, error) {
func TSDBStatus(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, focusLabel string, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, bool, error) {
qt = qt.NewChild("get tsdb stats: %s, focusLabel=%q, topN=%d", sq, focusLabel, topN)
defer qt.Done()
if deadline.Exceeded() {
@ -1038,8 +1031,8 @@ func toTopHeapEntries(m map[string]uint64, topN int) []storage.TopHeapEntry {
return a
}
// SeriesCount returns the number of unique series for the given at.
func SeriesCount(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, deadline searchutils.Deadline) (uint64, bool, error) {
// SeriesCount returns the number of unique series.
func SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, denyPartialResponse bool, deadline searchutils.Deadline) (uint64, bool, error) {
qt = qt.NewChild("get series count")
defer qt.Done()
if deadline.Exceeded() {
@ -1052,7 +1045,7 @@ func SeriesCount(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse boo
}
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
sn.seriesCountRequests.Inc()
n, err := sn.getSeriesCount(qt, at.AccountID, at.ProjectID, deadline)
n, err := sn.getSeriesCount(qt, accountID, projectID, deadline)
if err != nil {
sn.seriesCountErrors.Inc()
err = fmt.Errorf("cannot get series count from vmstorage %s: %w", sn.connPool.Addr(), err)
@ -1132,7 +1125,7 @@ var metricNamePool = &sync.Pool{
// f is called in parallel from multiple goroutines.
// It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block.
// It is the responsibility of f to filter blocks according to the given tr.
func ExportBlocks(qt *querytracer.Tracer, at *auth.Token, sq *storage.SearchQuery, deadline searchutils.Deadline,
func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline,
f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error) error {
qt = qt.NewChild("export blocks: %s", sq)
defer qt.Done()
@ -1166,7 +1159,7 @@ func ExportBlocks(qt *querytracer.Tracer, at *auth.Token, sq *storage.SearchQuer
atomic.AddUint64(&samples, uint64(mb.Block.RowsCount()))
return nil
}
_, err := processSearchQuery(qt, at, true, sq, processBlock, deadline)
_, err := processSearchQuery(qt, true, sq, processBlock, deadline)
// Make sure processBlock isn't called anymore in order to prevent from data races.
atomic.StoreUint32(&stopped, 1)
@ -1182,7 +1175,7 @@ func ExportBlocks(qt *querytracer.Tracer, at *auth.Token, sq *storage.SearchQuer
// SearchMetricNames returns all the metric names matching sq until the given deadline.
//
// The returned metric names must be unmarshaled via storage.MetricName.UnmarshalString().
func SearchMetricNames(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, sq *storage.SearchQuery, deadline searchutils.Deadline) ([]string, bool, error) {
func SearchMetricNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, deadline searchutils.Deadline) ([]string, bool, error) {
qt = qt.NewChild("fetch metric names: %s", sq)
defer qt.Done()
if deadline.Exceeded() {
@ -1236,7 +1229,7 @@ func SearchMetricNames(qt *querytracer.Tracer, at *auth.Token, denyPartialRespon
// ProcessSearchQuery performs sq until the given deadline.
//
// Results.RunParallel or Results.Cancel must be called on the returned Results.
func ProcessSearchQuery(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, sq *storage.SearchQuery, deadline searchutils.Deadline) (*Results, bool, error) {
func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, deadline searchutils.Deadline) (*Results, bool, error) {
qt = qt.NewChild("fetch matching series: %s", sq)
defer qt.Done()
if deadline.Exceeded() {
@ -1272,7 +1265,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, at *auth.Token, denyPartialRespo
}
return nil
}
isPartial, err := processSearchQuery(qt, at, denyPartialResponse, sq, processBlock, deadline)
isPartial, err := processSearchQuery(qt, denyPartialResponse, sq, processBlock, deadline)
// Make sure processBlock isn't called anymore in order to protect from data races.
atomic.StoreUint32(&stopped, 1)
@ -1289,7 +1282,6 @@ func ProcessSearchQuery(qt *querytracer.Tracer, at *auth.Token, denyPartialRespo
qt.Printf("fetch unique series=%d, blocks=%d, samples=%d, bytes=%d", len(tbfw.m), blocksRead, samples, tbfw.tbf.Len())
var rss Results
rss.at = at
rss.tr = tr
rss.deadline = deadline
rss.tbf = tbfw.tbf
@ -1304,7 +1296,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, at *auth.Token, denyPartialRespo
return &rss, isPartial, nil
}
func processSearchQuery(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, sq *storage.SearchQuery,
func processSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery,
processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) (bool, error) {
requestData := sq.Marshal(nil)

View file

@ -78,7 +78,7 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter,
}
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxFederateSeries)
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
rss, isPartial, err := netstorage.ProcessSearchQuery(nil, at, denyPartialResponse, sq, cp.deadline)
rss, isPartial, err := netstorage.ProcessSearchQuery(nil, denyPartialResponse, sq, cp.deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
@ -145,7 +145,7 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter
// Unconditionally deny partial response for the exported data,
// since users usually expect that the exported data is full.
denyPartialResponse := true
rss, _, err := netstorage.ProcessSearchQuery(nil, at, denyPartialResponse, sq, cp.deadline)
rss, _, err := netstorage.ProcessSearchQuery(nil, denyPartialResponse, sq, cp.deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
@ -168,7 +168,7 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter
}()
} else {
go func() {
err := netstorage.ExportBlocks(nil, at, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
err := netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
if err := bw.Error(); err != nil {
return err
}
@ -226,7 +226,7 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri
_, _ = bw.Write(trBuf)
// Marshal native blocks.
err = netstorage.ExportBlocks(nil, at, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
err = netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
if err := bw.Error(); err != nil {
return err
}
@ -350,7 +350,7 @@ func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter
// Unconditionally deny partial response for the exported data,
// since users usually expect that the exported data is full.
denyPartialResponse := true
rss, _, err := netstorage.ProcessSearchQuery(qt, at, denyPartialResponse, sq, cp.deadline)
rss, _, err := netstorage.ProcessSearchQuery(qt, denyPartialResponse, sq, cp.deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
@ -376,7 +376,7 @@ func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter
} else {
qtChild := qt.NewChild("background export format=%s", format)
go func() {
err := netstorage.ExportBlocks(qtChild, at, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
err := netstorage.ExportBlocks(qtChild, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
if err := bw.Error(); err != nil {
return err
}
@ -443,7 +443,7 @@ func DeleteHandler(startTime time.Time, at *auth.Token, r *http.Request) error {
return fmt.Errorf("start=%d and end=%d args aren't supported. Remove these args from the query in order to delete all the matching metrics", cp.start, cp.end)
}
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, 0)
deletedCount, err := netstorage.DeleteSeries(nil, at, sq, cp.deadline)
deletedCount, err := netstorage.DeleteSeries(nil, sq, cp.deadline)
if err != nil {
return fmt.Errorf("cannot delete time series: %w", err)
}
@ -516,7 +516,7 @@ func LabelValuesHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.To
}
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxUniqueTimeseries)
labelValues, isPartial, err := netstorage.LabelValues(qt, at, denyPartialResponse, labelName, sq, limit, cp.deadline)
labelValues, isPartial, err := netstorage.LabelValues(qt, denyPartialResponse, labelName, sq, limit, cp.deadline)
if err != nil {
return fmt.Errorf("cannot obtain values for label %q: %w", labelName, err)
}
@ -582,7 +582,7 @@ func TSDBStatusHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok
start := int64(date*secsPerDay) * 1000
end := int64((date+1)*secsPerDay)*1000 - 1
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, cp.filterss, *maxTSDBStatusSeries)
status, isPartial, err := netstorage.TSDBStatus(qt, at, denyPartialResponse, sq, focusLabel, topN, cp.deadline)
status, isPartial, err := netstorage.TSDBStatus(qt, denyPartialResponse, sq, focusLabel, topN, cp.deadline)
if err != nil {
return fmt.Errorf("cannot obtain tsdb stats: %w", err)
}
@ -615,7 +615,7 @@ func LabelsHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token,
}
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxUniqueTimeseries)
labels, isPartial, err := netstorage.LabelNames(qt, at, denyPartialResponse, sq, limit, cp.deadline)
labels, isPartial, err := netstorage.LabelNames(qt, denyPartialResponse, sq, limit, cp.deadline)
if err != nil {
return fmt.Errorf("cannot obtain labels: %w", err)
}
@ -638,7 +638,7 @@ func SeriesCountHandler(startTime time.Time, at *auth.Token, w http.ResponseWrit
deadline := searchutils.GetDeadlineForStatusRequest(r, startTime)
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
n, isPartial, err := netstorage.SeriesCount(nil, at, denyPartialResponse, deadline)
n, isPartial, err := netstorage.SeriesCount(nil, at.AccountID, at.ProjectID, denyPartialResponse, deadline)
if err != nil {
return fmt.Errorf("cannot obtain series count: %w", err)
}
@ -675,7 +675,7 @@ func SeriesHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token,
}
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxSeriesLimit)
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
metricNames, isPartial, err := netstorage.SearchMetricNames(qt, at, denyPartialResponse, sq, cp.deadline)
metricNames, isPartial, err := netstorage.SearchMetricNames(qt, denyPartialResponse, sq, cp.deadline)
if err != nil {
return fmt.Errorf("cannot fetch time series for %q: %w", sq, err)
}

View file

@ -930,7 +930,7 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa
minTimestamp -= ec.Step
}
sq := storage.NewSearchQuery(ec.AuthToken.AccountID, ec.AuthToken.ProjectID, minTimestamp, ec.End, tfss, ec.MaxSeries)
rss, isPartial, err := netstorage.ProcessSearchQuery(qt, ec.AuthToken, ec.DenyPartialResponse, sq, ec.Deadline)
rss, isPartial, err := netstorage.ProcessSearchQuery(qt, ec.DenyPartialResponse, sq, ec.Deadline)
if err != nil {
return nil, err
}