diff --git a/app/vmselect/main.go b/app/vmselect/main.go index a71bdf0d4..0b7db09bd 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -11,6 +11,8 @@ import ( "strings" "time" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/clusternative" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" @@ -34,7 +36,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi" - "github.com/VictoriaMetrics/metrics" ) var ( @@ -268,7 +269,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { httpserver.Errorf(w, r, "cannot parse path %q: %s", path, err) return true } - at, err := auth.NewToken(p.AuthToken) + at, err := auth.NewTokenPossibleMultitenant(p.AuthToken) if err != nil { httpserver.Errorf(w, r, "auth error: %s", err) return true @@ -309,6 +310,10 @@ func selectHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseW return true } } + if strings.HasPrefix(p.Suffix, "graphite/") && at == nil { + httpserver.Errorf(w, r, "multi-tenant queries are not supported by Graphite endpoints") + return true + } if strings.HasPrefix(p.Suffix, "graphite/tags/") && !isGraphiteTagsPath(p.Suffix[len("graphite"):]) { tagName := p.Suffix[len("graphite/tags/"):] graphiteTagValuesRequests.Inc() @@ -651,7 +656,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path } switch p.Suffix { case "prometheus/api/v1/status/active_queries": - at, err := auth.NewToken(p.AuthToken) + at, err := auth.NewTokenPossibleMultitenant(p.AuthToken) if err != nil { return false } @@ -660,7 +665,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path promql.ActiveQueriesHandler(at, w, r) return true case "prometheus/api/v1/status/top_queries": - at, err := auth.NewToken(p.AuthToken) + at, err := auth.NewTokenPossibleMultitenant(p.AuthToken) if err != nil { return false } diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index b9a512607..294c0f55b 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -10,6 +10,7 @@ import ( "net/http" "os" "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -79,8 +80,9 @@ func (r *Result) reset() { // Results holds results returned from ProcessSearchQuery. type Results struct { - tr storage.TimeRange - deadline searchutils.Deadline + shouldConvertTenantToLabels bool + tr storage.TimeRange + deadline searchutils.Deadline tbfs []*tmpBlocksFile @@ -265,14 +267,24 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke // Nothing to process return 0, nil } - + cb := f + if rss.shouldConvertTenantToLabels { + cb = func(rs *Result, workerID uint) error { + // TODO: (@f41gh7) if labels duplicates will be fixed + // query will return Duplicate Output Series error + // in this case, TenantToTags must be moved into RegisterAndWriteBlock method + metricNameTenantToTags(&rs.MetricName) + return f(rs, workerID) + } + } var mustStop atomic.Bool initTimeseriesWork := func(tsw *timeseriesWork, pts *packedTimeseries) { tsw.rss = rss tsw.pts = pts - tsw.f = f + tsw.f = cb tsw.mustStop = &mustStop } + maxWorkers := MaxWorkers() if maxWorkers == 1 || tswsLen == 1 { // It is faster to process time series in the current goroutine. @@ -834,7 +846,6 @@ func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadli func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline) (int, error) { qt = qt.NewChild("delete series: %s", sq) defer qt.Done() - requestData := sq.Marshal(nil) // Send the query to all the storage nodes in parallel. type nodeResult struct { @@ -843,25 +854,36 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear } sns := getStorageNodes() snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any { - sn.deleteSeriesRequests.Inc() - deletedCount, err := sn.deleteSeries(qt, requestData, deadline) + err := populateSqTenantTokensIfNeeded(sq) if err != nil { - sn.deleteSeriesErrors.Inc() - } - return &nodeResult{ - deletedCount: deletedCount, - err: err, + return []*nodeResult{{ + err: err, + }} } + + return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any { + sn.deleteSeriesRequests.Inc() + deletedCount, err := sn.deleteSeries(qt, requestData, deadline) + if err != nil { + sn.deleteSeriesErrors.Inc() + } + return &nodeResult{ + deletedCount: deletedCount, + err: err, + } + }) }) // Collect results deletedTotal := 0 err := snr.collectAllResults(func(result any) error { - nr := result.(*nodeResult) - if nr.err != nil { - return nr.err + for _, cr := range result.([]any) { + nr := cr.(*nodeResult) + if nr.err != nil { + return nr.err + } + deletedTotal += nr.deletedCount } - deletedTotal += nr.deletedCount return nil }) if err != nil { @@ -877,7 +899,6 @@ func LabelNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se if deadline.Exceeded() { return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } - requestData := sq.Marshal(nil) // Send the query to all the storage nodes in parallel. type nodeResult struct { labelNames []string @@ -885,28 +906,43 @@ func LabelNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se } sns := getStorageNodes() snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any { - sn.labelNamesRequests.Inc() - labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline) + err := populateSqTenantTokensIfNeeded(sq) if err != nil { - sn.labelNamesErrors.Inc() - err = fmt.Errorf("cannot get labels from vmstorage %s: %w", sn.connPool.Addr(), err) - } - return &nodeResult{ - labelNames: labelNames, - err: err, + return []*nodeResult{{ + err: err, + }} } + + return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any { + sn.labelNamesRequests.Inc() + labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline) + if err != nil { + sn.labelNamesErrors.Inc() + err = fmt.Errorf("cannot get labels from vmstorage %s: %w", sn.connPool.Addr(), err) + } + return &nodeResult{ + labelNames: labelNames, + err: err, + } + }) }) // Collect results var labelNames []string isPartial, err := snr.collectResults(partialLabelNamesResults, func(result any) error { - nr := result.(*nodeResult) - if nr.err != nil { - return nr.err + for _, cr := range result.([]any) { + nr := cr.(*nodeResult) + if nr.err != nil { + return nr.err + } + labelNames = append(labelNames, nr.labelNames...) } - labelNames = append(labelNames, nr.labelNames...) + return nil }) + if sq.IsMultiTenant { + labelNames = append(labelNames, []string{"vm_account_id", "vm_project_id"}...) + } qt.Printf("get %d non-duplicated labels", len(labelNames)) if err != nil { return nil, isPartial, fmt.Errorf("cannot fetch labels from vmstorage nodes: %w", err) @@ -979,7 +1015,36 @@ func LabelValues(qt *querytracer.Tracer, denyPartialResponse bool, labelName str if deadline.Exceeded() { return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } - requestData := sq.Marshal(nil) + + if sq.IsMultiTenant && isTenancyLabel(labelName) { + tenants, err := Tenants(qt, sq.GetTimeRange(), deadline) + if err != nil { + return nil, false, err + } + + var idx int + switch labelName { + case "vm_account_id": + idx = 0 + case "vm_project_id": + idx = 1 + default: + logger.Fatalf("BUG: unexpected labeName=%q", labelName) + } + + labelValues := make([]string, 0, len(tenants)) + for _, t := range tenants { + s := strings.Split(t, ":") + if len(s) != 2 { + logger.Fatalf("BUG: unexpected tenant received from storage: %q", t) + } + + labelValues = append(labelValues, s[idx]) + } + + labelValues = prepareLabelValues(qt, labelValues, maxLabelValues) + return labelValues, false, nil + } // Send the query to all the storage nodes in parallel. type nodeResult struct { @@ -988,33 +1053,49 @@ func LabelValues(qt *querytracer.Tracer, denyPartialResponse bool, labelName str } sns := getStorageNodes() snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any { - sn.labelValuesRequests.Inc() - labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline) + err := populateSqTenantTokensIfNeeded(sq) if err != nil { - sn.labelValuesErrors.Inc() - err = fmt.Errorf("cannot get label values from vmstorage %s: %w", sn.connPool.Addr(), err) - } - return &nodeResult{ - labelValues: labelValues, - err: err, + return []*nodeResult{{ + err: err, + }} } + + return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any { + sn.labelValuesRequests.Inc() + labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline) + if err != nil { + sn.labelValuesErrors.Inc() + err = fmt.Errorf("cannot get label values from vmstorage %s: %w", sn.connPool.Addr(), err) + } + return &nodeResult{ + labelValues: labelValues, + err: err, + } + }) }) // Collect results var labelValues []string isPartial, err := snr.collectResults(partialLabelValuesResults, func(result any) error { - nr := result.(*nodeResult) - if nr.err != nil { - return nr.err + for _, cr := range result.([]any) { + nr := cr.(*nodeResult) + if nr.err != nil { + return nr.err + } + labelValues = append(labelValues, nr.labelValues...) } - labelValues = append(labelValues, nr.labelValues...) return nil }) qt.Printf("get %d non-duplicated label values", len(labelValues)) if err != nil { return nil, isPartial, fmt.Errorf("cannot fetch label values from vmstorage nodes: %w", err) } + labelValues = prepareLabelValues(qt, labelValues, maxLabelValues) + return labelValues, isPartial, nil +} +func prepareLabelValues(qt *querytracer.Tracer, labelValues []string, maxLabelValues int) []string { + qt.Printf("get %d non-duplicated label values", len(labelValues)) // Deduplicate label values labelValues = deduplicateStrings(labelValues) qt.Printf("get %d unique label values after de-duplication", len(labelValues)) @@ -1024,7 +1105,7 @@ func LabelValues(qt *querytracer.Tracer, denyPartialResponse bool, labelName str } sort.Strings(labelValues) qt.Printf("sort %d label values", len(labelValues)) - return labelValues, isPartial, nil + return labelValues } // Tenants returns tenants until the given deadline. @@ -1110,7 +1191,8 @@ func GraphiteTagValues(qt *querytracer.Tracer, accountID, projectID uint32, deny // // It can be used for implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find func TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, denyPartialResponse bool, tr storage.TimeRange, tagKey, tagValuePrefix string, - delimiter byte, maxSuffixes int, deadline searchutils.Deadline) ([]string, bool, error) { + delimiter byte, maxSuffixes int, deadline searchutils.Deadline, +) ([]string, bool, error) { qt = qt.NewChild("get tag value suffixes for tagKey=%s, tagValuePrefix=%s, maxSuffixes=%d, timeRange=%s", tagKey, tagValuePrefix, maxSuffixes, &tr) defer qt.Done() if deadline.Exceeded() { @@ -1180,7 +1262,6 @@ func TSDBStatus(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se if deadline.Exceeded() { return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } - requestData := sq.Marshal(nil) // Send the query to all the storage nodes in parallel. type nodeResult struct { status *storage.TSDBStatus @@ -1188,26 +1269,37 @@ func TSDBStatus(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se } sns := getStorageNodes() snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any { - sn.tsdbStatusRequests.Inc() - status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline) + err := populateSqTenantTokensIfNeeded(sq) if err != nil { - sn.tsdbStatusErrors.Inc() - err = fmt.Errorf("cannot obtain tsdb status from vmstorage %s: %w", sn.connPool.Addr(), err) - } - return &nodeResult{ - status: status, - err: err, + return []*nodeResult{{ + err: err, + }} } + + return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any { + sn.tsdbStatusRequests.Inc() + status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline) + if err != nil { + sn.tsdbStatusErrors.Inc() + err = fmt.Errorf("cannot obtain tsdb status from vmstorage %s: %w", sn.connPool.Addr(), err) + } + return &nodeResult{ + status: status, + err: err, + } + }) }) // Collect results. var statuses []*storage.TSDBStatus isPartial, err := snr.collectResults(partialTSDBStatusResults, func(result any) error { - nr := result.(*nodeResult) - if nr.err != nil { - return nr.err + for _, cr := range result.([]any) { + nr := cr.(*nodeResult) + if nr.err != nil { + return nr.err + } + statuses = append(statuses, nr.status) } - statuses = append(statuses, nr.status) return nil }) if err != nil { @@ -1559,7 +1651,8 @@ var metricNamePool = &sync.Pool{ // It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block. // It is the responsibility of f to filter blocks according to the given tr. func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline, - f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error) error { + f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error, +) error { qt = qt.NewChild("export blocks: %s", sq) defer qt.Done() if deadline.Exceeded() { @@ -1577,6 +1670,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear if err := mn.Unmarshal(mb.MetricName); err != nil { return fmt.Errorf("cannot unmarshal metricName: %w", err) } + metricNameTenantToTags(mn) if err := f(mn, &mb.Block, tr, workerID); err != nil { return err } @@ -1603,7 +1697,6 @@ func SearchMetricNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *sto if deadline.Exceeded() { return nil, false, fmt.Errorf("timeout exceeded before starting to search metric names: %s", deadline.String()) } - requestData := sq.Marshal(nil) // Send the query to all the storage nodes in parallel. type nodeResult struct { @@ -1612,27 +1705,47 @@ func SearchMetricNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *sto } sns := getStorageNodes() snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any { - sn.searchMetricNamesRequests.Inc() - metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline) + err := populateSqTenantTokensIfNeeded(sq) if err != nil { - sn.searchMetricNamesErrors.Inc() - err = fmt.Errorf("cannot search metric names on vmstorage %s: %w", sn.connPool.Addr(), err) - } - return &nodeResult{ - metricNames: metricNames, - err: err, + return []*nodeResult{{ + err: err, + }} } + return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, t storage.TenantToken) any { + sn.searchMetricNamesRequests.Inc() + metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline) + if sq.IsMultiTenant { + // TODO: (@f41gh7) this function could produce duplicate labels + // if original metricName already have tenant labels + // fix it later + suffix := marshalAsTags(t.AccountID, t.ProjectID) + suffixStr := string(suffix) + for i := range metricNames { + metricNames[i] = metricNames[i] + suffixStr + } + } + if err != nil { + sn.searchMetricNamesErrors.Inc() + err = fmt.Errorf("cannot search metric names on vmstorage %s: %w", sn.connPool.Addr(), err) + } + return &nodeResult{ + metricNames: metricNames, + err: err, + } + }) }) // Collect results. metricNamesMap := make(map[string]struct{}) isPartial, err := snr.collectResults(partialSearchMetricNamesResults, func(result any) error { - nr := result.(*nodeResult) - if nr.err != nil { - return nr.err - } - for _, metricName := range nr.metricNames { - metricNamesMap[metricName] = struct{}{} + for _, cr := range result.([]any) { + nr := cr.(*nodeResult) + if nr.err != nil { + return nr.err + } + for _, metricName := range nr.metricNames { + metricNamesMap[metricName] = struct{}{} + } } return nil }) @@ -1644,11 +1757,22 @@ func SearchMetricNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *sto for metricName := range metricNamesMap { metricNames = append(metricNames, metricName) } - sort.Strings(metricNames) qt.Printf("sort %d metric names", len(metricNames)) return metricNames, isPartial, nil } +func marshalAsTags(accountID, projectID uint32) []byte { + buf := make([]byte, 0, 64) + var tag storage.Tag + tag.Key = []byte("vm_account_id") + tag.Value = strconv.AppendUint(tag.Value, uint64(accountID), 10) + buf = tag.Marshal(buf) + tag.Key = []byte("vm_project_id") + tag.Value = strconv.AppendUint(tag.Value[:0], uint64(projectID), 10) + buf = tag.Marshal(buf) + return buf +} + // limitExceededErr error generated by vmselect // on checking complexity limits during processing responses // from storage nodes. @@ -1722,21 +1846,22 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st addrs: addrssPool[m[metricName]].addrs, } } + rss.shouldConvertTenantToLabels = sq.IsMultiTenant rss.packedTimeseries = pts return &rss, isPartial, nil } // ProcessBlocks calls processBlock per each block matching the given sq. func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, - processBlock func(mb *storage.MetricBlock, workerID uint) error, deadline searchutils.Deadline) (bool, error) { + processBlock func(mb *storage.MetricBlock, workerID uint) error, deadline searchutils.Deadline, +) (bool, error) { sns := getStorageNodes() return processBlocks(qt, sns, denyPartialResponse, sq, processBlock, deadline) } func processBlocks(qt *querytracer.Tracer, sns []*storageNode, denyPartialResponse bool, sq *storage.SearchQuery, - processBlock func(mb *storage.MetricBlock, workerID uint) error, deadline searchutils.Deadline) (bool, error) { - requestData := sq.Marshal(nil) - + processBlock func(mb *storage.MetricBlock, workerID uint) error, deadline searchutils.Deadline, +) (bool, error) { // Make sure that processBlock is no longer called after the exit from processBlocks() function. // Use per-worker WaitGroup instead of a shared WaitGroup in order to avoid inter-CPU contention, // which may significantly slow down the rate of processBlock calls on multi-CPU systems. @@ -1773,12 +1898,31 @@ func processBlocks(qt *querytracer.Tracer, sns []*storageNode, denyPartialRespon // Send the query to all the storage nodes in parallel. snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) any { - sn.searchRequests.Inc() - err := sn.processSearchQuery(qt, requestData, f, workerID, deadline) + var err error + err = populateSqTenantTokensIfNeeded(sq) if err != nil { - sn.searchErrors.Inc() - err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err) + return &err } + + res := execSearchQuery(qt, sq, func(qt *querytracer.Tracer, rd []byte, _ storage.TenantToken) any { + sn.searchRequests.Inc() + err = sn.processSearchQuery(qt, rd, f, workerID, deadline) + if err != nil { + sn.searchErrors.Inc() + err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err) + return &err + } + + return &err + }) + + for _, e := range res { + e := e.(*error) + if *e != nil { + return e + } + } + return &err }) @@ -1803,6 +1947,21 @@ func processBlocks(qt *querytracer.Tracer, sns []*storageNode, denyPartialRespon return isPartial, nil } +func populateSqTenantTokensIfNeeded(sq *storage.SearchQuery) error { + if !sq.IsMultiTenant { + return nil + } + + if len(sq.TagFilterss) == 0 { + return nil + } + + tts, tfss := ApplyTenantFiltersToTagFilters(sq.TenantTokens, sq.TagFilterss) + sq.TenantTokens = tts + sq.TagFilterss = tfss + return nil +} + type storageNodesRequest struct { denyPartialResponse bool resultsCh chan rpcResult @@ -1817,7 +1976,8 @@ type rpcResult struct { } func startStorageNodesRequest(qt *querytracer.Tracer, sns []*storageNode, denyPartialResponse bool, - f func(qt *querytracer.Tracer, workerID uint, sn *storageNode) any) *storageNodesRequest { + f func(qt *querytracer.Tracer, workerID uint, sn *storageNode) any, +) *storageNodesRequest { resultsCh := make(chan rpcResult, len(sns)) qts := make(map[*querytracer.Tracer]struct{}, len(sns)) for idx, sn := range sns { @@ -2184,7 +2344,8 @@ func (sn *storageNode) getTenants(qt *querytracer.Tracer, tr storage.TimeRange, } func (sn *storageNode) getTagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, - delimiter byte, maxSuffixes int, deadline searchutils.Deadline) ([]string, error) { + delimiter byte, maxSuffixes int, deadline searchutils.Deadline, +) ([]string, error) { var suffixes []string f := func(bc *handshake.BufferedConn) error { ss, err := sn.getTagValueSuffixesOnConn(bc, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes) @@ -2249,7 +2410,8 @@ func (sn *storageNode) processSearchMetricNames(qt *querytracer.Tracer, requestD } func (sn *storageNode) processSearchQuery(qt *querytracer.Tracer, requestData []byte, processBlock func(mb *storage.MetricBlock, workerID uint) error, - workerID uint, deadline searchutils.Deadline) error { + workerID uint, deadline searchutils.Deadline, +) error { f := func(bc *handshake.BufferedConn) error { return sn.processSearchQueryOnConn(bc, requestData, processBlock, workerID) } @@ -2490,8 +2652,10 @@ func (sn *storageNode) getLabelNamesOnConn(bc *handshake.BufferedConn, requestDa } } -const maxLabelValueSize = 16 * 1024 * 1024 -const maxTenantValueSize = 16 * 1024 * 1024 // TODO: calc 'uint32:uint32' +const ( + maxLabelValueSize = 16 * 1024 * 1024 + maxTenantValueSize = 16 * 1024 * 1024 // TODO: calc 'uint32:uint32' +) func (sn *storageNode) getLabelValuesOnConn(bc *handshake.BufferedConn, labelName string, requestData []byte, maxLabelValues int) ([]string, error) { // Send the request to sn. @@ -2575,7 +2739,8 @@ func (sn *storageNode) getTenantsOnConn(bc *handshake.BufferedConn, tr storage.T } func (sn *storageNode) getTagValueSuffixesOnConn(bc *handshake.BufferedConn, accountID, projectID uint32, - tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int) ([]string, error) { + tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int, +) ([]string, error) { // Send the request to sn. if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil { return nil, err @@ -2789,7 +2954,8 @@ func (sn *storageNode) processSearchMetricNamesOnConn(bc *handshake.BufferedConn const maxMetricNameSize = 64 * 1024 func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, - processBlock func(mb *storage.MetricBlock, workerID uint) error, workerID uint) error { + processBlock func(mb *storage.MetricBlock, workerID uint) error, workerID uint, +) error { // Send the request to sn. if err := writeBytes(bc, requestData); err != nil { return fmt.Errorf("cannot write requestData: %w", err) @@ -3112,3 +3278,41 @@ func (pnc *perNodeCounter) GetTotal() uint64 { // // See https://github.com/golang/go/blob/704401ffa06c60e059c9e6e4048045b4ff42530a/src/runtime/malloc.go#L11 const maxFastAllocBlockSize = 32 * 1024 + +// execSearchQuery calls cb for with marshaled requestData for each tenant in sq. +func execSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, cb func(qt *querytracer.Tracer, requestData []byte, t storage.TenantToken) any) []any { + var requestData []byte + var results []any + + for i := range sq.TenantTokens { + requestData = sq.TenantTokens[i].Marshal(requestData) + requestData = sq.MarshaWithoutTenant(requestData) + qtL := qt + if sq.IsMultiTenant && qt.Enabled() { + qtL = qt.NewChild("query for tenant: %s", sq.TenantTokens[i].String()) + } + r := cb(qtL, requestData, sq.TenantTokens[i]) + if sq.IsMultiTenant { + qtL.Done() + } + results = append(results, r) + requestData = requestData[:0] + } + + return results +} + +// TenantToTags moves AccountID:ProjectID to corresponding tenant tags +// Erases values from AccountID:ProjectID +// TODO: @f41gh7 this function could produce duplicates +// if original metric name have tenant labels +func metricNameTenantToTags(mn *storage.MetricName) { + + buf := make([]byte, 0, 8) + buf = strconv.AppendUint(buf, uint64(mn.AccountID), 10) + mn.AddTagBytes([]byte(`vm_account_id`), buf) + buf = strconv.AppendUint(buf[:0], uint64(mn.ProjectID), 10) + mn.AddTagBytes([]byte(`vm_project_id`), buf) + mn.AccountID = 0 + mn.ProjectID = 0 +} diff --git a/app/vmselect/netstorage/tenant_cache.go b/app/vmselect/netstorage/tenant_cache.go new file mode 100644 index 000000000..c039f3536 --- /dev/null +++ b/app/vmselect/netstorage/tenant_cache.go @@ -0,0 +1,189 @@ +package netstorage + +import ( + "flag" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/VictoriaMetrics/metrics" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" +) + +var ( + tenantsCacheDuration = flag.Duration("search.tenantCacheExpireDuration", 5*time.Minute, "The expiry duration for list of tenants for multi-tenant queries.") +) + +// TenantsCached returns the list of tenants available in the storage. +func TenantsCached(qt *querytracer.Tracer, tr storage.TimeRange, deadline searchutils.Deadline) ([]storage.TenantToken, error) { + qt.Printf("fetching tenants on timeRange=%s", tr.String()) + + cached := tenantsCacheV.get(tr) + qt.Printf("fetched %d tenants from cache", len(cached)) + if len(cached) > 0 { + return cached, nil + } + + tenants, err := Tenants(qt, tr, deadline) + if err != nil { + return nil, fmt.Errorf("cannot obtain tenants: %w", err) + } + + qt.Printf("fetched %d tenants from storage", len(tenants)) + + tt := make([]storage.TenantToken, len(tenants)) + for i, t := range tenants { + accountID, projectID, err := auth.ParseToken(t) + if err != nil { + return nil, fmt.Errorf("cannot parse tenant token %q: %w", t, err) + } + tt[i].AccountID = accountID + tt[i].ProjectID = projectID + } + + tenantsCacheV.put(tr, tt) + qt.Printf("put %d tenants into cache", len(tenants)) + + return tt, nil +} + +var tenantsCacheV = func() *tenantsCache { + tc := newTenantsCache(*tenantsCacheDuration) + return tc +}() + +type tenantsCacheItem struct { + tenants []storage.TenantToken + tr storage.TimeRange + expires time.Time +} + +type tenantsCache struct { + // items is used for intersection matches lookup + items []*tenantsCacheItem + + itemExpiration time.Duration + + requests atomic.Uint64 + misses atomic.Uint64 + + mu sync.Mutex +} + +func newTenantsCache(expiration time.Duration) *tenantsCache { + tc := &tenantsCache{ + items: make([]*tenantsCacheItem, 0), + itemExpiration: expiration, + } + + metrics.GetOrCreateGauge(`vm_cache_requests_total{type="multitenancy/tenants"}`, func() float64 { + return float64(tc.Requests()) + }) + metrics.GetOrCreateGauge(`vm_cache_misses_total{type="multitenancy/tenants"}`, func() float64 { + return float64(tc.Misses()) + }) + metrics.GetOrCreateGauge(`vm_cache_entries{type="multitenancy/tenants"}`, func() float64 { + return float64(tc.Len()) + }) + + return tc +} + +func (tc *tenantsCache) cleanupLocked() { + expires := time.Now().Add(tc.itemExpiration) + for i := len(tc.items) - 1; i >= 0; i-- { + if tc.items[i].expires.Before(expires) { + tc.items = append(tc.items[:i], tc.items[i+1:]...) + } + } +} + +func (tc *tenantsCache) put(tr storage.TimeRange, tenants []storage.TenantToken) { + tc.mu.Lock() + defer tc.mu.Unlock() + alignTrToDay(&tr) + + exp := time.Now().Add(timeutil.AddJitterToDuration(tc.itemExpiration)) + + ci := &tenantsCacheItem{ + tenants: tenants, + tr: tr, + expires: exp, + } + + tc.items = append(tc.items, ci) +} +func (tc *tenantsCache) Requests() uint64 { + return tc.requests.Load() +} + +func (tc *tenantsCache) Misses() uint64 { + return tc.misses.Load() +} + +func (tc *tenantsCache) Len() uint64 { + tc.mu.Lock() + n := len(tc.items) + tc.mu.Unlock() + return uint64(n) +} + +func (tc *tenantsCache) get(tr storage.TimeRange) []storage.TenantToken { + tc.requests.Add(1) + + alignTrToDay(&tr) + return tc.getInternal(tr) +} + +func (tc *tenantsCache) getInternal(tr storage.TimeRange) []storage.TenantToken { + tc.mu.Lock() + defer tc.mu.Unlock() + if len(tc.items) == 0 { + return nil + } + + result := make(map[storage.TenantToken]struct{}) + cleanupNeeded := false + for idx := range tc.items { + ci := tc.items[idx] + if ci.expires.Before(time.Now()) { + cleanupNeeded = true + } + + if hasIntersection(tr, ci.tr) { + for _, t := range ci.tenants { + result[t] = struct{}{} + } + } + } + + if cleanupNeeded { + tc.cleanupLocked() + } + + tenants := make([]storage.TenantToken, 0, len(result)) + for t := range result { + tenants = append(tenants, t) + } + + return tenants +} + +// alignTrToDay aligns the given time range to the day boundaries +// tr.minTimestamp will be set to the start of the day +// tr.maxTimestamp will be set to the end of the day +func alignTrToDay(tr *storage.TimeRange) { + tr.MinTimestamp = timeutil.StartOfDay(tr.MinTimestamp) + tr.MaxTimestamp = timeutil.EndOfDay(tr.MaxTimestamp) +} + +// hasIntersection checks if there is any intersection of the given time ranges +func hasIntersection(a, b storage.TimeRange) bool { + return a.MinTimestamp <= b.MaxTimestamp && a.MaxTimestamp >= b.MinTimestamp +} diff --git a/app/vmselect/netstorage/tenant_cache_test.go b/app/vmselect/netstorage/tenant_cache_test.go new file mode 100644 index 000000000..6c77f8d70 --- /dev/null +++ b/app/vmselect/netstorage/tenant_cache_test.go @@ -0,0 +1,91 @@ +package netstorage + +import ( + "reflect" + "sort" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" +) + +func TestFetchingTenants(t *testing.T) { + tc := newTenantsCache(5 * time.Second) + + dayMs := (time.Hour * 24 * 1000).Milliseconds() + + tc.put(storage.TimeRange{MinTimestamp: 0, MaxTimestamp: 0}, []storage.TenantToken{ + {AccountID: 1, ProjectID: 1}, + {AccountID: 1, ProjectID: 0}, + }) + tc.put(storage.TimeRange{MinTimestamp: 0, MaxTimestamp: dayMs - 1}, []storage.TenantToken{ + {AccountID: 1, ProjectID: 1}, + {AccountID: 1, ProjectID: 0}, + }) + tc.put(storage.TimeRange{MinTimestamp: dayMs, MaxTimestamp: 2*dayMs - 1}, []storage.TenantToken{ + {AccountID: 2, ProjectID: 1}, + {AccountID: 2, ProjectID: 0}, + }) + tc.put(storage.TimeRange{MinTimestamp: 2 * dayMs, MaxTimestamp: 3*dayMs - 1}, []storage.TenantToken{ + {AccountID: 3, ProjectID: 1}, + {AccountID: 3, ProjectID: 0}, + }) + + f := func(tr storage.TimeRange, expectedTenants []storage.TenantToken) { + t.Helper() + tenants := tc.get(tr) + + if len(tenants) == 0 && len(tenants) == len(expectedTenants) { + return + } + + sortTenants := func(t []storage.TenantToken) func(i, j int) bool { + return func(i, j int) bool { + if t[i].AccountID == t[j].AccountID { + return t[i].ProjectID < t[j].ProjectID + } + return t[i].AccountID < t[j].AccountID + } + } + sort.Slice(tenants, sortTenants(tenants)) + sort.Slice(expectedTenants, sortTenants(expectedTenants)) + + if !reflect.DeepEqual(tenants, expectedTenants) { + t.Fatalf("unexpected tenants; got %v; want %v", tenants, expectedTenants) + } + } + + // Basic time range coverage + f(storage.TimeRange{MinTimestamp: 0, MaxTimestamp: 0}, []storage.TenantToken{{AccountID: 1, ProjectID: 1}, {AccountID: 1, ProjectID: 0}}) + f(storage.TimeRange{MinTimestamp: 0, MaxTimestamp: 100}, []storage.TenantToken{{AccountID: 1, ProjectID: 1}, {AccountID: 1, ProjectID: 0}}) + f(storage.TimeRange{MinTimestamp: dayMs, MaxTimestamp: dayMs}, []storage.TenantToken{{AccountID: 2, ProjectID: 1}, {AccountID: 2, ProjectID: 0}}) + f(storage.TimeRange{MinTimestamp: 2 * dayMs, MaxTimestamp: 2 * dayMs}, []storage.TenantToken{{AccountID: 3, ProjectID: 1}, {AccountID: 3, ProjectID: 0}}) + f(storage.TimeRange{MinTimestamp: 3 * dayMs, MaxTimestamp: 3*dayMs + 1}, []storage.TenantToken{}) + + // Time range inside existing range + f(storage.TimeRange{MinTimestamp: dayMs / 2, MaxTimestamp: dayMs/2 + 100}, []storage.TenantToken{{AccountID: 1, ProjectID: 1}, {AccountID: 1, ProjectID: 0}}) + f(storage.TimeRange{MinTimestamp: dayMs + dayMs/2, MaxTimestamp: dayMs + dayMs/2 + 100}, []storage.TenantToken{{AccountID: 2, ProjectID: 1}, {AccountID: 2, ProjectID: 0}}) + f(storage.TimeRange{MinTimestamp: 0, MaxTimestamp: dayMs / 2}, []storage.TenantToken{{AccountID: 1, ProjectID: 1}, {AccountID: 1, ProjectID: 0}}) + f(storage.TimeRange{MinTimestamp: dayMs / 2, MaxTimestamp: dayMs - 1}, []storage.TenantToken{{AccountID: 1, ProjectID: 1}, {AccountID: 1, ProjectID: 0}}) + + // Overlapping time ranges + f(storage.TimeRange{MinTimestamp: 0, MaxTimestamp: 2*dayMs - 1}, []storage.TenantToken{{AccountID: 1, ProjectID: 1}, {AccountID: 1, ProjectID: 0}, {AccountID: 2, ProjectID: 1}, {AccountID: 2, ProjectID: 0}}) + f(storage.TimeRange{MinTimestamp: dayMs / 2, MaxTimestamp: dayMs + dayMs/2}, []storage.TenantToken{{AccountID: 1, ProjectID: 1}, {AccountID: 1, ProjectID: 0}, {AccountID: 2, ProjectID: 1}, {AccountID: 2, ProjectID: 0}}) +} + +func TestHasIntersection(t *testing.T) { + f := func(inner, outer storage.TimeRange, expected bool) { + t.Helper() + if hasIntersection(inner, outer) != expected { + t.Fatalf("unexpected result for inner=%+v, outer=%+v", inner, outer) + } + } + + f(storage.TimeRange{MinTimestamp: 0, MaxTimestamp: 150}, storage.TimeRange{MinTimestamp: 0, MaxTimestamp: 0}, true) + f(storage.TimeRange{MinTimestamp: 0, MaxTimestamp: 150}, storage.TimeRange{MinTimestamp: 0, MaxTimestamp: 100}, true) + f(storage.TimeRange{MinTimestamp: 50, MaxTimestamp: 150}, storage.TimeRange{MinTimestamp: 0, MaxTimestamp: 100}, true) + f(storage.TimeRange{MinTimestamp: 50, MaxTimestamp: 150}, storage.TimeRange{MinTimestamp: 10, MaxTimestamp: 80}, true) + + f(storage.TimeRange{MinTimestamp: 0, MaxTimestamp: 50}, storage.TimeRange{MinTimestamp: 60, MaxTimestamp: 100}, false) + f(storage.TimeRange{MinTimestamp: 100, MaxTimestamp: 150}, storage.TimeRange{MinTimestamp: 60, MaxTimestamp: 80}, false) +} diff --git a/app/vmselect/netstorage/tenant_filters.go b/app/vmselect/netstorage/tenant_filters.go new file mode 100644 index 000000000..d9a4d4903 --- /dev/null +++ b/app/vmselect/netstorage/tenant_filters.go @@ -0,0 +1,129 @@ +package netstorage + +import ( + "fmt" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" +) + +// GetTenantTokensFromFilters returns the list of tenant tokens and the list of filters without tenant filters. +func GetTenantTokensFromFilters(qt *querytracer.Tracer, tr storage.TimeRange, tfs [][]storage.TagFilter, deadline searchutils.Deadline) ([]storage.TenantToken, [][]storage.TagFilter, error) { + tenants, err := TenantsCached(qt, tr, deadline) + if err != nil { + return nil, nil, fmt.Errorf("cannot obtain tenants: %w", err) + } + + tenantFilters, otherFilters := splitFiltersByType(tfs) + + tts, err := applyFiltersToTenants(tenants, tenantFilters) + if err != nil { + return nil, nil, fmt.Errorf("cannot apply filters to tenants: %w", err) + } + + return tts, otherFilters, nil +} + +func splitFiltersByType(tfs [][]storage.TagFilter) ([][]storage.TagFilter, [][]storage.TagFilter) { + if len(tfs) == 0 { + return nil, tfs + } + + tenantFilters := make([][]storage.TagFilter, 0, len(tfs)) + otherFilters := make([][]storage.TagFilter, 0, len(tfs)) + for _, f := range tfs { + ffs := make([]storage.TagFilter, 0, len(f)) + offs := make([]storage.TagFilter, 0, len(f)) + for _, tf := range f { + if !isTenancyLabel(string(tf.Key)) { + offs = append(offs, tf) + continue + } + ffs = append(ffs, tf) + } + + if len(ffs) > 0 { + tenantFilters = append(tenantFilters, ffs) + } + if len(offs) > 0 { + otherFilters = append(otherFilters, offs) + } + } + return tenantFilters, otherFilters +} + +// ApplyTenantFiltersToTagFilters applies the given tenant filters to the given tag filters. +func ApplyTenantFiltersToTagFilters(tts []storage.TenantToken, tfs [][]storage.TagFilter) ([]storage.TenantToken, [][]storage.TagFilter) { + tenantFilters, otherFilters := splitFiltersByType(tfs) + if len(tenantFilters) == 0 { + return tts, otherFilters + } + + tts, err := applyFiltersToTenants(tts, tenantFilters) + if err != nil { + return nil, nil + } + return tts, otherFilters +} + +func tagFiltersToString(tfs []storage.TagFilter) string { + a := make([]string, len(tfs)) + for i, tf := range tfs { + a[i] = tf.String() + } + return "{" + strings.Join(a, ",") + "}" +} + +// applyFiltersToTenants applies the given filters to the given tenants. +// It returns the filtered tenants. +func applyFiltersToTenants(tenants []storage.TenantToken, filters [][]storage.TagFilter) ([]storage.TenantToken, error) { + // fast path - return all tenants if no filters are given + if len(filters) == 0 { + return tenants, nil + } + + resultingTokens := make([]storage.TenantToken, 0, len(tenants)) + lbs := make([][]prompbmarshal.Label, 0, len(filters)) + lbsAux := make([]prompbmarshal.Label, 0, len(filters)) + for _, token := range tenants { + lbsAuxLen := len(lbsAux) + lbsAux = append(lbsAux, prompbmarshal.Label{ + Name: "vm_account_id", + Value: fmt.Sprintf("%d", token.AccountID), + }, prompbmarshal.Label{ + Name: "vm_project_id", + Value: fmt.Sprintf("%d", token.ProjectID), + }) + + lbs = append(lbs, lbsAux[lbsAuxLen:]) + } + + promIfs := make([]promrelabel.IfExpression, len(filters)) + for i, tags := range filters { + filter := tagFiltersToString(tags) + err := promIfs[i].Parse(filter) + if err != nil { + return nil, fmt.Errorf("cannot parse if expression from filters %v: %s", filter, err) + } + } + + for i, lb := range lbs { + for _, promIf := range promIfs { + if promIf.Match(lb) { + resultingTokens = append(resultingTokens, tenants[i]) + break + } + } + } + + return resultingTokens, nil +} + +// isTenancyLabel returns true if the given label name is used for tenancy. +func isTenancyLabel(name string) bool { + return name == "vm_account_id" || name == "vm_project_id" +} diff --git a/app/vmselect/netstorage/tenant_filters_test.go b/app/vmselect/netstorage/tenant_filters_test.go new file mode 100644 index 000000000..e2a10719c --- /dev/null +++ b/app/vmselect/netstorage/tenant_filters_test.go @@ -0,0 +1,52 @@ +package netstorage + +import ( + "reflect" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" +) + +func TestApplyFiltersToTenants(t *testing.T) { + f := func(filters [][]storage.TagFilter, tenants []storage.TenantToken, expectedTenants []storage.TenantToken) { + tenantsResult, err := applyFiltersToTenants(tenants, filters) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + if !reflect.DeepEqual(tenantsResult, expectedTenants) { + t.Fatalf("unexpected tenants result; got %v; want %v", tenantsResult, expectedTenants) + } + } + + f(nil, []storage.TenantToken{{AccountID: 1, ProjectID: 1}, {AccountID: 1, ProjectID: 0}}, []storage.TenantToken{{AccountID: 1, ProjectID: 1}, {AccountID: 1, ProjectID: 0}}) + + f([][]storage.TagFilter{{{Key: []byte("vm_account_id"), Value: []byte("1"), IsNegative: false, IsRegexp: false}}}, []storage.TenantToken{{AccountID: 1, ProjectID: 1}, {AccountID: 1, ProjectID: 0}}, []storage.TenantToken{{AccountID: 1, ProjectID: 1}, {AccountID: 1, ProjectID: 0}}) + f([][]storage.TagFilter{{{Key: []byte("vm_account_id"), Value: []byte("1"), IsNegative: false, IsRegexp: false}, {Key: []byte("vm_project_id"), Value: []byte("0"), IsNegative: false, IsRegexp: false}}}, []storage.TenantToken{{AccountID: 1, ProjectID: 1}, {AccountID: 1, ProjectID: 0}}, []storage.TenantToken{{AccountID: 1, ProjectID: 0}}) + + f([][]storage.TagFilter{{{Key: []byte("vm_account_id"), Value: []byte("1[0-9]+"), IsNegative: false, IsRegexp: true}}}, []storage.TenantToken{{AccountID: 1, ProjectID: 1}, {AccountID: 12323, ProjectID: 0}, {AccountID: 12323, ProjectID: 3}, {AccountID: 345, ProjectID: 0}}, []storage.TenantToken{{AccountID: 12323, ProjectID: 0}, {AccountID: 12323, ProjectID: 3}}) + + f([][]storage.TagFilter{{{Key: []byte("vm_account_id"), Value: []byte("1"), IsNegative: false, IsRegexp: false}, {Key: []byte("vm_project_id"), Value: []byte("0"), IsNegative: true, IsRegexp: false}}}, []storage.TenantToken{{AccountID: 1, ProjectID: 1}, {AccountID: 1, ProjectID: 0}}, []storage.TenantToken{{AccountID: 1, ProjectID: 1}}) +} + +func TestIsTenancyLabel(t *testing.T) { + f := func(label string, expected bool) { + t.Helper() + isTenancyLabel := isTenancyLabel(label) + if isTenancyLabel != expected { + t.Fatalf("unexpected result for label %q; got %v; want %v", label, isTenancyLabel, expected) + } + } + + f("vm_account_id", true) + f("vm_project_id", true) + + // Test that the label is case-insensitive + f("VM_account_id", false) + f("VM_project_id", false) + + // non-tenancy labels + f("job", false) + f("instance", false) + +} diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 5e761a69b..26f18991e 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -13,6 +13,11 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/metricsql" + + "github.com/valyala/fastjson/fastfloat" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats" @@ -28,9 +33,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" - "github.com/VictoriaMetrics/metrics" - "github.com/VictoriaMetrics/metricsql" - "github.com/valyala/fastjson/fastfloat" ) var ( @@ -124,7 +126,10 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, if cp.IsDefaultTimeRange() { cp.start = cp.end - lookbackDelta } - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxFederateSeries) + sq, err := getSearchQuery(nil, at, cp, *maxFederateSeries) + if err != nil { + return fmt.Errorf("cannot obtain search query: %w", err) + } denyPartialResponse := httputils.GetDenyPartialResponse(r) rss, isPartial, err := netstorage.ProcessSearchQuery(nil, denyPartialResponse, sq, cp.deadline) if err != nil { @@ -170,7 +175,10 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter fieldNames := strings.Split(format, ",") reduceMemUsage := httputils.GetBool(r, "reduce_mem_usage") - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxExportSeries) + sq, err := getSearchQuery(nil, at, cp, *maxExportSeries) + if err != nil { + return fmt.Errorf("cannot obtain search query: %w", err) + } w.Header().Set("Content-Type", "text/csv; charset=utf-8") bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) @@ -250,7 +258,10 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri return err } - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxExportSeries) + sq, err := getSearchQuery(nil, at, cp, *maxExportSeries) + if err != nil { + return fmt.Errorf("cannot obtain search query: %w", err) + } w.Header().Set("Content-Type", "VictoriaMetrics/native") bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) @@ -392,7 +403,11 @@ func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter } } - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxExportSeries) + sq, err := getSearchQuery(qt, at, cp, *maxExportSeries) + if err != nil { + return fmt.Errorf("cannot obtain search query: %w", err) + } + w.Header().Set("Content-Type", contentType) doneCh := make(chan error, 1) @@ -450,7 +465,7 @@ func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter doneCh <- err }() } - err := <-doneCh + err = <-doneCh if err != nil { return fmt.Errorf("cannot send data to remote client: %w", err) } @@ -597,12 +612,14 @@ func LabelValuesHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.To return err } denyPartialResponse := httputils.GetDenyPartialResponse(r) - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxLabelsAPISeries) + sq, err := getSearchQuery(qt, at, cp, *maxLabelsAPISeries) + if err != nil { + return err + } labelValues, isPartial, err := netstorage.LabelValues(qt, denyPartialResponse, labelName, sq, limit, cp.deadline) if err != nil { return fmt.Errorf("cannot obtain values for label %q: %w", labelName, err) } - w.Header().Set("Content-Type", "application/json") bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) @@ -661,9 +678,12 @@ func TSDBStatusHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok topN = n } denyPartialResponse := httputils.GetDenyPartialResponse(r) - start := int64(date*secsPerDay) * 1000 - end := int64((date+1)*secsPerDay)*1000 - 1 - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, cp.filterss, *maxTSDBStatusSeries) + cp.start = int64(date*secsPerDay) * 1000 + cp.end = int64((date+1)*secsPerDay)*1000 - 1 + sq, err := getSearchQuery(qt, at, cp, *maxTSDBStatusSeries) + if err != nil { + return err + } status, isPartial, err := netstorage.TSDBStatus(qt, denyPartialResponse, sq, focusLabel, topN, cp.deadline) if err != nil { return fmt.Errorf("cannot obtain tsdb stats: %w", err) @@ -696,7 +716,10 @@ func LabelsHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, return err } denyPartialResponse := httputils.GetDenyPartialResponse(r) - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxLabelsAPISeries) + sq, err := getSearchQuery(qt, at, cp, *maxLabelsAPISeries) + if err != nil { + return err + } labels, isPartial, err := netstorage.LabelNames(qt, denyPartialResponse, sq, limit, cp.deadline) if err != nil { return fmt.Errorf("cannot obtain labels: %w", err) @@ -712,6 +735,18 @@ func LabelsHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, return nil } +func getSearchQuery(qt *querytracer.Tracer, at *auth.Token, cp *commonParams, maxSeries int) (*storage.SearchQuery, error) { + if at != nil { + return storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, maxSeries), nil + } + tt, tfs, err := netstorage.GetTenantTokensFromFilters(qt, storage.TimeRange{MinTimestamp: cp.start, MaxTimestamp: cp.end}, cp.filterss, cp.deadline) + if err != nil { + return nil, fmt.Errorf("cannot obtain tenant tokens: %w", err) + } + sq := storage.NewMultiTenantSearchQuery(tt, cp.start, cp.end, tfs, maxSeries) + return sq, nil +} + var labelsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/labels"}`) // SeriesCountHandler processes /api/v1/series/count request. @@ -756,8 +791,10 @@ func SeriesHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, if err != nil { return err } - - sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxSeriesLimit) + sq, err := getSearchQuery(qt, at, cp, *maxSeriesLimit) + if err != nil { + return err + } denyPartialResponse := httputils.GetDenyPartialResponse(r) metricNames, isPartial, err := netstorage.SearchMetricNames(qt, denyPartialResponse, sq, cp.deadline) if err != nil { @@ -883,7 +920,6 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w } qs := &promql.QueryStats{} ec := &promql.EvalConfig{ - AuthToken: at, Start: start, End: start, Step: step, @@ -902,6 +938,11 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w DenyPartialResponse: httputils.GetDenyPartialResponse(r), QueryStats: qs, } + err = populateAuthTokens(qt, ec, at, deadline) + if err != nil { + return fmt.Errorf("cannot populate auth tokens: %w", err) + } + result, err := promql.Exec(qt, ec, query, true) if err != nil { return fmt.Errorf("error when executing query=%q for (time=%d, step=%d): %w", query, start, step, err) @@ -993,7 +1034,6 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok qs := &promql.QueryStats{} ec := &promql.EvalConfig{ - AuthToken: at, Start: start, End: end, Step: step, @@ -1012,6 +1052,11 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok DenyPartialResponse: httputils.GetDenyPartialResponse(r), QueryStats: qs, } + err = populateAuthTokens(qt, ec, at, deadline) + if err != nil { + return fmt.Errorf("cannot populate auth tokens: %w", err) + } + result, err := promql.Exec(qt, ec, query, false) if err != nil { return err @@ -1043,6 +1088,30 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok return nil } +func populateAuthTokens(qt *querytracer.Tracer, ec *promql.EvalConfig, at *auth.Token, deadline searchutils.Deadline) error { + if at != nil { + ec.AuthTokens = []*auth.Token{at} + return nil + } + + tt, tfs, err := netstorage.GetTenantTokensFromFilters(qt, storage.TimeRange{MinTimestamp: ec.Start, MaxTimestamp: ec.End}, ec.EnforcedTagFilterss, deadline) + if err != nil { + return fmt.Errorf("cannot obtain tenant tokens for the given search query: %w", err) + } + ec.EnforcedTagFilterss = tfs + + ats := make([]*auth.Token, len(tt)) + for i, t := range tt { + ats[i] = &auth.Token{ + AccountID: t.AccountID, + ProjectID: t.ProjectID, + } + } + ec.AuthTokens = ats + ec.IsMultiTenant = true + return nil +} + func removeEmptyValuesAndTimeseries(tss []netstorage.Result) []netstorage.Result { dst := tss[:0] for i := range tss { diff --git a/app/vmselect/promql/active_queries.go b/app/vmselect/promql/active_queries.go index 9ee4cf152..91976b453 100644 --- a/app/vmselect/promql/active_queries.go +++ b/app/vmselect/promql/active_queries.go @@ -19,16 +19,16 @@ import ( // If at is nil, then all the active queries across all the tenants are written. func ActiveQueriesHandler(at *auth.Token, w http.ResponseWriter, _ *http.Request) { aqes := activeQueriesV.GetAll() - if at != nil { - // Filter out queries, which do not belong to at. - dst := aqes[:0] - for _, aqe := range aqes { - if aqe.accountID == at.AccountID && aqe.projectID == at.ProjectID { - dst = append(dst, aqe) - } + + // Filter out queries, which do not belong to at. + // if at is nil, then all the queries are returned for multi-tenant request + dst := aqes[:0] + for _, aqe := range aqes { + if at == nil || (aqe.accountID == at.AccountID && aqe.projectID == at.ProjectID) { + dst = append(dst, aqe) } - aqes = dst } + aqes = dst writeActiveQueries(w, aqes) } @@ -42,8 +42,8 @@ func writeActiveQueries(w http.ResponseWriter, aqes []activeQueryEntry) { fmt.Fprintf(w, `{"status":"ok","data":[`) for i, aqe := range aqes { d := now.Sub(aqe.startTime) - fmt.Fprintf(w, `{"duration":"%.3fs","id":"%016X","remote_addr":%s,"account_id":"%d","project_id":"%d","query":%s,"start":%d,"end":%d,"step":%d}`, - d.Seconds(), aqe.qid, aqe.quotedRemoteAddr, aqe.accountID, aqe.projectID, stringsutil.JSONString(aqe.q), aqe.start, aqe.end, aqe.step) + fmt.Fprintf(w, `{"duration":"%.3fs","id":"%016X","remote_addr":%s,"account_id":"%d","project_id":"%d","query":%s,"start":%d,"end":%d,"step":%d,"is_multitenant":%v}`, + d.Seconds(), aqe.qid, aqe.quotedRemoteAddr, aqe.accountID, aqe.projectID, stringsutil.JSONString(aqe.q), aqe.start, aqe.end, aqe.step, aqe.isMultitenant) if i+1 < len(aqes) { fmt.Fprintf(w, `,`) } @@ -68,6 +68,7 @@ type activeQueryEntry struct { quotedRemoteAddr string q string startTime time.Time + isMultitenant bool } func newActiveQueries() *activeQueries { @@ -78,8 +79,12 @@ func newActiveQueries() *activeQueries { func (aq *activeQueries) Add(ec *EvalConfig, q string) uint64 { var aqe activeQueryEntry - aqe.accountID = ec.AuthToken.AccountID - aqe.projectID = ec.AuthToken.ProjectID + if ec.IsMultiTenant { + aqe.isMultitenant = true + } else { + aqe.accountID = ec.AuthTokens[0].AccountID + aqe.projectID = ec.AuthTokens[0].ProjectID + } aqe.start = ec.Start aqe.end = ec.End aqe.step = ec.Step diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 7d4bad6ea..69825a40a 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -12,6 +12,9 @@ import ( "time" "unsafe" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/metricsql" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" @@ -26,8 +29,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" - "github.com/VictoriaMetrics/metrics" - "github.com/VictoriaMetrics/metricsql" ) var ( @@ -109,10 +110,12 @@ func alignStartEnd(start, end, step int64) (int64, int64) { // EvalConfig is the configuration required for query evaluation via Exec type EvalConfig struct { - AuthToken *auth.Token - Start int64 - End int64 - Step int64 + AuthTokens []*auth.Token + IsMultiTenant bool + + Start int64 + End int64 + Step int64 // MaxSeries is the maximum number of time series, which can be scanned by the query. // Zero means 'no limit' @@ -160,7 +163,8 @@ type EvalConfig struct { // copyEvalConfig returns src copy. func copyEvalConfig(src *EvalConfig) *EvalConfig { var ec EvalConfig - ec.AuthToken = src.AuthToken + ec.AuthTokens = src.AuthTokens + ec.IsMultiTenant = src.IsMultiTenant ec.Start = src.Start ec.End = src.End ec.Step = src.Step @@ -963,7 +967,7 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName return nil, nil } sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries) - preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps) + preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps, ec.IsMultiTenant) if err != nil { return nil, err } @@ -1107,13 +1111,18 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, } return offset >= maxOffset } + + at := ec.AuthTokens[0] + if ec.IsMultiTenant { + at = nil + } deleteCachedSeries := func(qt *querytracer.Tracer) { - rollupResultCacheV.DeleteInstantValues(qt, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss) + rollupResultCacheV.DeleteInstantValues(qt, at, expr, window, ec.Step, ec.EnforcedTagFilterss) } getCachedSeries := func(qt *querytracer.Tracer) ([]*timeseries, int64, error) { again: offset := int64(0) - tssCached := rollupResultCacheV.GetInstantValues(qt, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss) + tssCached := rollupResultCacheV.GetInstantValues(qt, at, expr, window, ec.Step, ec.EnforcedTagFilterss) ec.QueryStats.addSeriesFetched(len(tssCached)) if len(tssCached) == 0 { // Cache miss. Re-populate the missing data. @@ -1139,7 +1148,7 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string, tss, err := evalAt(qt, timestamp, window) return tss, 0, err } - rollupResultCacheV.PutInstantValues(qt, ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss, tss) + rollupResultCacheV.PutInstantValues(qt, at, expr, window, ec.Step, ec.EnforcedTagFilterss, tss) return tss, offset, nil } // Cache hit. Verify whether it is OK to use the cached data. @@ -1707,7 +1716,7 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri } // Obtain rollup configs before fetching data from db, so type errors could be caught earlier. sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries) - preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps) + preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps, ec.IsMultiTenant) if err != nil { return nil, err } @@ -1724,7 +1733,18 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri } else { minTimestamp -= ec.Step } - sq := storage.NewSearchQuery(ec.AuthToken.AccountID, ec.AuthToken.ProjectID, minTimestamp, ec.End, tfss, ec.MaxSeries) + var sq *storage.SearchQuery + + if ec.IsMultiTenant { + ts := make([]storage.TenantToken, len(ec.AuthTokens)) + for i, at := range ec.AuthTokens { + ts[i].ProjectID = at.ProjectID + ts[i].AccountID = at.AccountID + } + sq = storage.NewMultiTenantSearchQuery(ts, minTimestamp, ec.End, tfss, ec.MaxSeries) + } else { + sq = storage.NewSearchQuery(ec.AuthTokens[0].AccountID, ec.AuthTokens[0].ProjectID, minTimestamp, ec.End, tfss, ec.MaxSeries) + } rss, isPartial, err := netstorage.ProcessSearchQuery(qt, ec.DenyPartialResponse, sq, ec.Deadline) if err != nil { return nil, err @@ -1955,8 +1975,6 @@ var bbPool bytesutil.ByteBufferPool func evalNumber(ec *EvalConfig, n float64) []*timeseries { var ts timeseries ts.denyReuse = true - ts.MetricName.AccountID = ec.AuthToken.AccountID - ts.MetricName.ProjectID = ec.AuthToken.ProjectID timestamps := ec.getSharedTimestamps() values := make([]float64, len(timestamps)) for i := range timestamps { diff --git a/app/vmselect/promql/eval_test.go b/app/vmselect/promql/eval_test.go index 3182fbf53..b0adc5bec 100644 --- a/app/vmselect/promql/eval_test.go +++ b/app/vmselect/promql/eval_test.go @@ -4,9 +4,10 @@ import ( "reflect" "testing" + "github.com/VictoriaMetrics/metricsql" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" - "github.com/VictoriaMetrics/metricsql" ) func TestGetCommonLabelFilters(t *testing.T) { diff --git a/app/vmselect/promql/exec.go b/app/vmselect/promql/exec.go index 4b84a9306..e75370f20 100644 --- a/app/vmselect/promql/exec.go +++ b/app/vmselect/promql/exec.go @@ -10,14 +10,15 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/metricsql" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" - "github.com/VictoriaMetrics/metrics" - "github.com/VictoriaMetrics/metricsql" ) var ( @@ -39,10 +40,14 @@ var ( func Exec(qt *querytracer.Tracer, ec *EvalConfig, q string, isFirstPointOnly bool) ([]netstorage.Result, error) { if querystats.Enabled() { startTime := time.Now() - ac := ec.AuthToken defer func() { - querystats.RegisterQuery(ac.AccountID, ac.ProjectID, q, ec.End-ec.Start, startTime) ec.QueryStats.addExecutionTimeMsec(startTime) + if ec.IsMultiTenant { + querystats.RegisterQueryMultiTenant(q, ec.End-ec.Start, startTime) + return + } + at := ec.AuthTokens[0] + querystats.RegisterQuery(at.AccountID, at.ProjectID, q, ec.End-ec.Start, startTime) }() } diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 6db7ec11d..51825cbd5 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -65,10 +65,11 @@ func TestExecSuccess(t *testing.T) { f := func(q string, resultExpected []netstorage.Result) { t.Helper() ec := &EvalConfig{ - AuthToken: &auth.Token{ + AuthTokens: []*auth.Token{{ AccountID: accountID, ProjectID: projectID, - }, + }}, + Start: start, End: end, Step: step, @@ -82,7 +83,7 @@ func TestExecSuccess(t *testing.T) { if err != nil { t.Fatalf(`unexpected error when executing %q: %s`, q, err) } - testResultsEqual(t, result, resultExpected) + testResultsEqual(t, result, resultExpected, false) } } @@ -9345,10 +9346,10 @@ func TestExecError(t *testing.T) { f := func(q string) { t.Helper() ec := &EvalConfig{ - AuthToken: &auth.Token{ + AuthTokens: []*auth.Token{{ AccountID: 123, ProjectID: 567, - }, + }}, Start: 1000, End: 2000, Step: 100, @@ -9631,7 +9632,7 @@ func TestExecError(t *testing.T) { f(`rollup_candlestick(time(), "foo")`) } -func testResultsEqual(t *testing.T, result, resultExpected []netstorage.Result) { +func testResultsEqual(t *testing.T, result, resultExpected []netstorage.Result, verifyTenant bool) { t.Helper() if len(result) != len(resultExpected) { t.Fatalf(`unexpected timeseries count; got %d; want %d`, len(result), len(resultExpected)) @@ -9639,17 +9640,17 @@ func testResultsEqual(t *testing.T, result, resultExpected []netstorage.Result) for i := range result { r := &result[i] rExpected := &resultExpected[i] - testMetricNamesEqual(t, &r.MetricName, &rExpected.MetricName, i) + testMetricNamesEqual(t, &r.MetricName, &rExpected.MetricName, verifyTenant, i) testRowsEqual(t, r.Values, r.Timestamps, rExpected.Values, rExpected.Timestamps) } } -func testMetricNamesEqual(t *testing.T, mn, mnExpected *storage.MetricName, pos int) { +func testMetricNamesEqual(t *testing.T, mn, mnExpected *storage.MetricName, verifyTenant bool, pos int) { t.Helper() - if mn.AccountID != mnExpected.AccountID { + if verifyTenant && mn.AccountID != mnExpected.AccountID { t.Fatalf(`unexpected accountID; got %d; want %d`, mn.AccountID, mnExpected.AccountID) } - if mn.ProjectID != mnExpected.ProjectID { + if verifyTenant && mn.ProjectID != mnExpected.ProjectID { t.Fatalf(`unexpected projectID; got %d; want %d`, mn.ProjectID, mnExpected.ProjectID) } if string(mn.MetricGroup) != string(mnExpected.MetricGroup) { diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index 66a0bf11c..ffd7fdd86 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -369,7 +369,7 @@ func getRollupTag(expr metricsql.Expr) (string, error) { } func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start, end, step int64, maxPointsPerSeries int, - window, lookbackDelta int64, sharedTimestamps []int64) ( + window, lookbackDelta int64, sharedTimestamps []int64, isMultiTenant bool) ( func(values []float64, timestamps []int64), []*rollupConfig, error) { preFunc := func(_ []float64, _ []int64) {} funcName = strings.ToLower(funcName) @@ -395,6 +395,7 @@ func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start Timestamps: sharedTimestamps, isDefaultRollup: funcName == "default_rollup", samplesScannedPerCall: samplesScannedPerCall, + isMultiTenant: isMultiTenant, } } @@ -586,6 +587,10 @@ type rollupConfig struct { // // If zero, then it is considered that Func scans all the samples passed to it. samplesScannedPerCall int + + // Whether the rollup is used in multi-tenant mode. + // This is used in order to populate labels with tenancy information. + isMultiTenant bool } func (rc *rollupConfig) getTimestamps() []int64 { diff --git a/app/vmselect/promql/rollup_result_cache.go b/app/vmselect/promql/rollup_result_cache.go index 0928cab7a..86a1678a6 100644 --- a/app/vmselect/promql/rollup_result_cache.go +++ b/app/vmselect/promql/rollup_result_cache.go @@ -9,6 +9,10 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/fastcache" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/metricsql" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" @@ -21,9 +25,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" - "github.com/VictoriaMetrics/fastcache" - "github.com/VictoriaMetrics/metrics" - "github.com/VictoriaMetrics/metricsql" ) var ( @@ -176,7 +177,6 @@ func (rrc *rollupResultCache) GetInstantValues(qt *querytracer.Tracer, at *auth. // Obtain instant values from the cache bb := bbPool.Get() defer bbPool.Put(bb) - bb.B = marshalRollupResultCacheKeyForInstantValues(bb.B[:0], at, expr, window, step, etfss) tss, ok := rrc.getSeriesFromCache(qt, bb.B) if !ok || len(tss) == 0 { @@ -207,7 +207,6 @@ func (rrc *rollupResultCache) PutInstantValues(qt *querytracer.Tracer, at *auth. bb := bbPool.Get() defer bbPool.Put(bb) - bb.B = marshalRollupResultCacheKeyForInstantValues(bb.B[:0], at, expr, window, step, etfss) _ = rrc.putSeriesToCache(qt, bb.B, step, tss) } @@ -215,12 +214,10 @@ func (rrc *rollupResultCache) PutInstantValues(qt *querytracer.Tracer, at *auth. func (rrc *rollupResultCache) DeleteInstantValues(qt *querytracer.Tracer, at *auth.Token, expr metricsql.Expr, window, step int64, etfss [][]storage.TagFilter) { bb := bbPool.Get() defer bbPool.Put(bb) - bb.B = marshalRollupResultCacheKeyForInstantValues(bb.B[:0], at, expr, window, step, etfss) if !rrc.putSeriesToCache(qt, bb.B, step, nil) { logger.Panicf("BUG: cannot store zero series to cache") } - if qt.Enabled() { query := string(expr.AppendString(nil)) query = stringsutil.LimitStringLen(query, 300) @@ -239,8 +236,12 @@ func (rrc *rollupResultCache) GetSeries(qt *querytracer.Tracer, ec *EvalConfig, // Obtain tss from the cache. bb := bbPool.Get() defer bbPool.Put(bb) + at := ec.AuthTokens[0] + if ec.IsMultiTenant { + at = nil + } - bb.B = marshalRollupResultCacheKeyForSeries(bb.B[:0], ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss) + bb.B = marshalRollupResultCacheKeyForSeries(bb.B[:0], at, expr, window, ec.Step, ec.EnforcedTagFilterss) metainfoBuf := rrc.c.Get(nil, bb.B) if len(metainfoBuf) == 0 { qt.Printf("nothing found") @@ -262,7 +263,7 @@ func (rrc *rollupResultCache) GetSeries(qt *querytracer.Tracer, ec *EvalConfig, if !ok { mi.RemoveKey(key) metainfoBuf = mi.Marshal(metainfoBuf[:0]) - bb.B = marshalRollupResultCacheKeyForSeries(bb.B[:0], ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss) + bb.B = marshalRollupResultCacheKeyForSeries(bb.B[:0], ec.AuthTokens[0], expr, window, ec.Step, ec.EnforcedTagFilterss) rrc.c.Set(bb.B, metainfoBuf) return nil, ec.Start } @@ -368,7 +369,11 @@ func (rrc *rollupResultCache) PutSeries(qt *querytracer.Tracer, ec *EvalConfig, metainfoBuf := bbPool.Get() defer bbPool.Put(metainfoBuf) - metainfoKey.B = marshalRollupResultCacheKeyForSeries(metainfoKey.B[:0], ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss) + at := ec.AuthTokens[0] + if ec.IsMultiTenant { + at = nil + } + metainfoKey.B = marshalRollupResultCacheKeyForSeries(metainfoKey.B[:0], at, expr, window, ec.Step, ec.EnforcedTagFilterss) metainfoBuf.B = rrc.c.Get(metainfoBuf.B[:0], metainfoKey.B) var mi rollupResultCacheMetainfo if len(metainfoBuf.B) > 0 { @@ -508,8 +513,10 @@ func marshalRollupResultCacheKeyForSeries(dst []byte, at *auth.Token, expr metri dst = append(dst, rollupResultCacheVersion) dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix.Load()) dst = append(dst, rollupResultCacheTypeSeries) - dst = encoding.MarshalUint32(dst, at.AccountID) - dst = encoding.MarshalUint32(dst, at.ProjectID) + if at != nil { + dst = encoding.MarshalUint32(dst, at.AccountID) + dst = encoding.MarshalUint32(dst, at.ProjectID) + } dst = encoding.MarshalInt64(dst, window) dst = encoding.MarshalInt64(dst, step) dst = marshalTagFiltersForRollupResultCacheKey(dst, etfs) @@ -521,8 +528,10 @@ func marshalRollupResultCacheKeyForInstantValues(dst []byte, at *auth.Token, exp dst = append(dst, rollupResultCacheVersion) dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix.Load()) dst = append(dst, rollupResultCacheTypeInstantValues) - dst = encoding.MarshalUint32(dst, at.AccountID) - dst = encoding.MarshalUint32(dst, at.ProjectID) + if at != nil { + dst = encoding.MarshalUint32(dst, at.AccountID) + dst = encoding.MarshalUint32(dst, at.ProjectID) + } dst = encoding.MarshalInt64(dst, window) dst = encoding.MarshalInt64(dst, step) dst = marshalTagFiltersForRollupResultCacheKey(dst, etfs) diff --git a/app/vmselect/promql/rollup_result_cache_test.go b/app/vmselect/promql/rollup_result_cache_test.go index 3aca342c2..c036e6630 100644 --- a/app/vmselect/promql/rollup_result_cache_test.go +++ b/app/vmselect/promql/rollup_result_cache_test.go @@ -4,10 +4,11 @@ import ( "fmt" "testing" + "github.com/VictoriaMetrics/metricsql" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" - "github.com/VictoriaMetrics/metricsql" ) func TestRollupResultCacheInitStop(t *testing.T) { @@ -40,10 +41,10 @@ func TestRollupResultCache(t *testing.T) { Step: 200, MaxPointsPerSeries: 1e4, - AuthToken: &auth.Token{ + AuthTokens: []*auth.Token{{ AccountID: 333, ProjectID: 843, - }, + }}, MayCache: true, } @@ -322,7 +323,60 @@ func TestRollupResultCache(t *testing.T) { } testTimeseriesEqual(t, tss, tssExpected) }) + t.Run("multi-tenant cache can be retrieved", func(t *testing.T) { + ResetRollupResultCache() + tssGolden := []*timeseries{ + { + MetricName: storage.MetricName{ + AccountID: 0, + ProjectID: 0, + }, + Timestamps: []int64{800, 1000, 1200}, + Values: []float64{0, 1, 2}, + }, + { + MetricName: storage.MetricName{ + AccountID: 0, + ProjectID: 1, + }, + Timestamps: []int64{800, 1000, 1200}, + Values: []float64{0, 1, 2}, + }, + { + MetricName: storage.MetricName{ + AccountID: 1, + ProjectID: 1, + }, + Timestamps: []int64{800, 1000, 1200}, + Values: []float64{0, 1, 2}, + }, + } + ecL := copyEvalConfig(ec) + ecL.Start = 800 + ecL.AuthTokens = []*auth.Token{ + { + AccountID: 0, + ProjectID: 0, + }, + { + AccountID: 0, + ProjectID: 1, + }, + { + AccountID: 1, + ProjectID: 1, + }, + } + ecL.IsMultiTenant = true + rollupResultCacheV.PutSeries(nil, ecL, fe, window, tssGolden) + tss, newStart := rollupResultCacheV.GetSeries(nil, ecL, fe, window) + if newStart != 1400 { + t.Fatalf("unexpected newStart; got %d; want %d", newStart, 1400) + } + + testTimeseriesEqual(t, tss, tssGolden) + }) } func TestMergeSeries(t *testing.T) { @@ -511,7 +565,7 @@ func testTimeseriesEqual(t *testing.T, tss, tssExpected []*timeseries) { } for i, ts := range tss { tsExpected := tssExpected[i] - testMetricNamesEqual(t, &ts.MetricName, &tsExpected.MetricName, i) + testMetricNamesEqual(t, &ts.MetricName, &tsExpected.MetricName, true, i) testRowsEqual(t, ts.Values, ts.Timestamps, tsExpected.Values, tsExpected.Timestamps) } } diff --git a/app/vmselect/promql/timeseries.go b/app/vmselect/promql/timeseries.go index be0d69c92..f8fd735b9 100644 --- a/app/vmselect/promql/timeseries.go +++ b/app/vmselect/promql/timeseries.go @@ -287,8 +287,9 @@ func marshalMetricTagsFast(dst []byte, tags []storage.Tag) []byte { } func marshalMetricNameSorted(dst []byte, mn *storage.MetricName) []byte { - // Do not marshal AccountID and ProjectID, since they are unused. dst = marshalBytesFast(dst, mn.MetricGroup) + dst = encoding.MarshalUint32(dst, mn.AccountID) + dst = encoding.MarshalUint32(dst, mn.ProjectID) return marshalMetricTagsSorted(dst, mn) } diff --git a/app/vmselect/querystats/querystats.go b/app/vmselect/querystats/querystats.go index 30fd5d175..c2f9a8928 100644 --- a/app/vmselect/querystats/querystats.go +++ b/app/vmselect/querystats/querystats.go @@ -36,6 +36,14 @@ func RegisterQuery(accountID, projectID uint32, query string, timeRangeMsecs int qsTracker.registerQuery(accountID, projectID, query, timeRangeMsecs, startTime) } +// RegisterQueryMultiTenant registers the query on the given timeRangeMsecs, which has been started at startTime. +// +// RegisterQueryMultiTenant must be called when the query is finished. +func RegisterQueryMultiTenant(query string, timeRangeMsecs int64, startTime time.Time) { + initOnce.Do(initQueryStats) + qsTracker.registerQueryMultiTenant(query, timeRangeMsecs, startTime) +} + // WriteJSONQueryStats writes query stats to given writer in json format. func WriteJSONQueryStats(w io.Writer, topN int, maxLifetime time.Duration) { initOnce.Do(initQueryStats) @@ -66,6 +74,7 @@ type queryStatRecord struct { timeRangeSecs int64 registerTime time.Time duration time.Duration + multiTenant bool } type queryStatKey struct { @@ -73,6 +82,7 @@ type queryStatKey struct { projectID uint32 query string timeRangeSecs int64 + multiTenant bool } type accountProjectFilter struct { @@ -100,7 +110,7 @@ func (qst *queryStatsTracker) writeJSONQueryStats(w io.Writer, topN int, apFilte fmt.Fprintf(w, `"topByCount":[`) topByCount := qst.getTopByCount(topN, apFilter, maxLifetime) for i, r := range topByCount { - fmt.Fprintf(w, `{"accountID":%d,"projectID":%d,"query":%s,"timeRangeSeconds":%d,"count":%d}`, r.accountID, r.projectID, stringsutil.JSONString(r.query), r.timeRangeSecs, r.count) + fmt.Fprintf(w, `{"accountID":%d,"projectID":%d,"query":%s,"timeRangeSeconds":%d,"count":%d,"multiTenant":%v}`, r.accountID, r.projectID, stringsutil.JSONString(r.query), r.timeRangeSecs, r.count, r.multiTenant) if i+1 < len(topByCount) { fmt.Fprintf(w, `,`) } @@ -108,8 +118,8 @@ func (qst *queryStatsTracker) writeJSONQueryStats(w io.Writer, topN int, apFilte fmt.Fprintf(w, `],"topByAvgDuration":[`) topByAvgDuration := qst.getTopByAvgDuration(topN, apFilter, maxLifetime) for i, r := range topByAvgDuration { - fmt.Fprintf(w, `{"accountID":%d,"projectID":%d,"query":%s,"timeRangeSeconds":%d,"avgDurationSeconds":%.3f,"count":%d}`, - r.accountID, r.projectID, stringsutil.JSONString(r.query), r.timeRangeSecs, r.duration.Seconds(), r.count) + fmt.Fprintf(w, `{"accountID":%d,"projectID":%d,"query":%s,"timeRangeSeconds":%d,"avgDurationSeconds":%.3f,"count":%d,"multiTenant": %v}`, + r.accountID, r.projectID, stringsutil.JSONString(r.query), r.timeRangeSecs, r.duration.Seconds(), r.count, r.multiTenant) if i+1 < len(topByAvgDuration) { fmt.Fprintf(w, `,`) } @@ -117,8 +127,8 @@ func (qst *queryStatsTracker) writeJSONQueryStats(w io.Writer, topN int, apFilte fmt.Fprintf(w, `],"topBySumDuration":[`) topBySumDuration := qst.getTopBySumDuration(topN, apFilter, maxLifetime) for i, r := range topBySumDuration { - fmt.Fprintf(w, `{"accountID":%d,"projectID":%d,"query":%s,"timeRangeSeconds":%d,"sumDurationSeconds":%.3f,"count":%d}`, - r.accountID, r.projectID, stringsutil.JSONString(r.query), r.timeRangeSecs, r.duration.Seconds(), r.count) + fmt.Fprintf(w, `{"accountID":%d,"projectID":%d,"query":%s,"timeRangeSeconds":%d,"sumDurationSeconds":%.3f,"count":%d,"multiTenant":%v}`, + r.accountID, r.projectID, stringsutil.JSONString(r.query), r.timeRangeSecs, r.duration.Seconds(), r.count, r.multiTenant) if i+1 < len(topBySumDuration) { fmt.Fprintf(w, `,`) } @@ -151,6 +161,30 @@ func (qst *queryStatsTracker) registerQuery(accountID, projectID uint32, query s r.duration = duration } +func (qst *queryStatsTracker) registerQueryMultiTenant(query string, timeRangeMsecs int64, startTime time.Time) { + registerTime := time.Now() + duration := registerTime.Sub(startTime) + if duration < *minQueryDuration { + return + } + + qst.mu.Lock() + defer qst.mu.Unlock() + + a := qst.a + idx := qst.nextIdx + if idx >= uint(len(a)) { + idx = 0 + } + qst.nextIdx = idx + 1 + r := &a[idx] + r.multiTenant = true + r.query = query + r.timeRangeSecs = timeRangeMsecs / 1000 + r.registerTime = registerTime + r.duration = duration +} + func (r *queryStatRecord) matches(apFilter *accountProjectFilter, currentTime time.Time, maxLifetime time.Duration) bool { if r.query == "" || currentTime.Sub(r.registerTime) > maxLifetime { return false @@ -167,6 +201,7 @@ func (r *queryStatRecord) key() queryStatKey { projectID: r.projectID, query: r.query, timeRangeSecs: r.timeRangeSecs, + multiTenant: r.multiTenant, } } @@ -190,6 +225,7 @@ func (qst *queryStatsTracker) getTopByCount(topN int, apFilter *accountProjectFi query: k.query, timeRangeSecs: k.timeRangeSecs, count: count, + multiTenant: k.multiTenant, }) } sort.Slice(a, func(i, j int) bool { @@ -207,6 +243,7 @@ type queryStatByCount struct { query string timeRangeSecs int64 count int + multiTenant bool } func (qst *queryStatsTracker) getTopByAvgDuration(topN int, apFilter *accountProjectFilter, maxLifetime time.Duration) []queryStatByDuration { @@ -237,6 +274,7 @@ func (qst *queryStatsTracker) getTopByAvgDuration(topN int, apFilter *accountPro timeRangeSecs: k.timeRangeSecs, duration: ks.sum / time.Duration(ks.count), count: ks.count, + multiTenant: k.multiTenant, }) } sort.Slice(a, func(i, j int) bool { @@ -255,6 +293,7 @@ type queryStatByDuration struct { timeRangeSecs int64 duration time.Duration count int + multiTenant bool } func (qst *queryStatsTracker) getTopBySumDuration(topN int, apFilter *accountProjectFilter, maxLifetime time.Duration) []queryStatByDuration { @@ -285,6 +324,7 @@ func (qst *queryStatsTracker) getTopBySumDuration(topN int, apFilter *accountPro timeRangeSecs: k.timeRangeSecs, duration: kd.sum, count: kd.count, + multiTenant: k.multiTenant, }) } sort.Slice(a, func(i, j int) bool { diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 50707cbbc..2c882faa6 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -113,8 +113,55 @@ such as [Graphite](https://docs.victoriametrics.com/#how-to-send-data-from-graph [InfluxDB line protocol via TCP and UDP](https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) and [OpenTSDB telnet put protocol](https://docs.victoriametrics.com/#sending-data-via-telnet-put-protocol). + +`vmselect` can execute queries over multiple [tenants](#multitenancy) via special `multitenant` endpoints `http://vmselect:8481/select/multitenant/`. +Currently supported endpoints for `` are: +- `/prometheus/api/v1/query` +- `/prometheus/api/v1/query_range` +- `/prometheus/api/v1/series` +- `/prometheus/api/v1/labels` +- `/prometheus/api/v1/label//values` +- `/prometheus/api/v1/status/active_queries` +- `/prometheus/api/v1/status/top_queries` +- `/prometheus/api/v1/status/tsdb` +- `/prometheus/api/v1/export` +- `/prometheus/api/v1/export/csv` +- `/vmui` + +It is possible to explicitly specify `accountID` and `projectID` for querying multiple tenants via `vm_account_id` and `vm_project_id` labels in the query. +Alternatively, it is possible to use [`extra_filters[]` and `extra_label`](https://docs.victoriametrics.com/#prometheus-querying-api-enhancements) +query args to apply additional filters for the query. + +For example, the following query fetches the total number of time series for the tenants `accountID=42` and `accountID=7, projectID=9`: +``` +up{vm_account_id="7", vm_project_id="9" or vm_account_id="42"} +``` + +In order to achieve the same via `extra_filters[]` and `extra_label` query args, the following query must be used: +``` +curl 'http://vmselect:8481/select/multitenant/prometheus/api/v1/query' \ + -d 'query=up' \ + -d 'extra_filters[]={vm_account_id="7",vm_project_id="9"}' \ + -d 'extra_filters[]={vm_account_id="42"}' +``` + +The precedence for applying filters for tenants follows this order: + +1. filters tenants from `extra_label` and `extra_filters` query arguments label selectors. + These filters have the highest priority and are applied first when provided through the query arguments. + +2. filters tenants from labels selectors defined at metricsQL query expression. + + + +Note that `vm_account_id` and `vm_project_id` labels support all operators for label matching. For example: +``` +up{vm_account_id!="42"} # selects all the time series except those belonging to accountID=42 +up{vm_account_id=~"4.*"} # selects all the time series belonging to accountIDs starting with 4 +``` + **Security considerations:** it is recommended restricting access to `multitenant` endpoints only to trusted sources, -since untrusted source may break per-tenant data by writing unwanted samples to arbitrary tenants. +since untrusted source may break per-tenant data by writing unwanted samples or get access to data of arbitrary tenants. ## Binaries @@ -1596,6 +1643,8 @@ Below is the output for `/path/to/vmselect -help`: -search.inmemoryBufSizeBytes size Size for in-memory data blocks used during processing search requests. By default, the size is automatically calculated based on available memory. Adjust this flag value if you observe that vm_tmp_blocks_max_inmemory_file_size_bytes metric constantly shows much higher values than vm_tmp_blocks_inmemory_file_size_bytes. See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6851 Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0) + -search.tenantCacheExpireDuration duration + The expiry duration for list of tenants for multi-tenant queries. (default 5m0s) -search.treatDotsAsIsInRegexps Whether to treat dots as is in regexp label filters used in queries. For example, foo{bar=~"a.b.c"} will be automatically converted to foo{bar=~"a\\.b\\.c"}, i.e. all the dots in regexp filters will be automatically escaped in order to match only dot char instead of matching any char. Dots in ".+", ".*" and ".{n}" regexps aren't escaped. This option is DEPRECATED in favor of {__graphite__="a.*.c"} syntax for selecting metrics matching the given Graphite metrics filter -selectNode array diff --git a/lib/auth/auth.go b/lib/auth/auth.go index 9a7a4346f..893515d59 100644 --- a/lib/auth/auth.go +++ b/lib/auth/auth.go @@ -44,25 +44,35 @@ func NewTokenPossibleMultitenant(authToken string) (*Token, error) { // Init initializes t from authToken. func (t *Token) Init(authToken string) error { + accountID, projectID, err := ParseToken(authToken) + if err != nil { + return fmt.Errorf("cannot parse authToken %q: %w", authToken, err) + } + + t.Set(accountID, projectID) + return nil +} + +// ParseToken parses authToken and returns accountID and projectID from it. +func ParseToken(authToken string) (uint32, uint32, error) { tmp := strings.Split(authToken, ":") if len(tmp) > 2 { - return fmt.Errorf("unexpected number of items in authToken %q; got %d; want 1 or 2", authToken, len(tmp)) + return 0, 0, fmt.Errorf("unexpected number of items in authToken %q; got %d; want 1 or 2", authToken, len(tmp)) } n, err := strconv.ParseUint(tmp[0], 10, 32) if err != nil { - return fmt.Errorf("cannot parse accountID from %q: %w", tmp[0], err) + return 0, 0, fmt.Errorf("cannot parse accountID from %q: %w", tmp[0], err) } accountID := uint32(n) projectID := uint32(0) if len(tmp) > 1 { n, err := strconv.ParseUint(tmp[1], 10, 32) if err != nil { - return fmt.Errorf("cannot parse projectID from %q: %w", tmp[1], err) + return 0, 0, fmt.Errorf("cannot parse projectID from %q: %w", tmp[1], err) } projectID = uint32(n) } - t.Set(accountID, projectID) - return nil + return accountID, projectID, nil } // Set sets accountID and projectID for the t. diff --git a/lib/storage/search.go b/lib/storage/search.go index c51bc7224..1e8de9976 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -277,6 +277,9 @@ type SearchQuery struct { AccountID uint32 ProjectID uint32 + TenantTokens []TenantToken + IsMultiTenant bool + // The time range for searching time series MinTimestamp int64 MaxTimestamp int64 @@ -306,12 +309,53 @@ func NewSearchQuery(accountID, projectID uint32, start, end int64, tagFilterss [ maxMetrics = 2e9 } return &SearchQuery{ - AccountID: accountID, - ProjectID: projectID, MinTimestamp: start, MaxTimestamp: end, TagFilterss: tagFilterss, MaxMetrics: maxMetrics, + TenantTokens: []TenantToken{ + { + AccountID: accountID, + ProjectID: projectID, + }, + }, + } +} + +// TenantToken represents a tenant (accountID, projectID) pair. +type TenantToken struct { + AccountID uint32 + ProjectID uint32 +} + +// String returns string representation of t. +func (t *TenantToken) String() string { + return fmt.Sprintf("{accountID=%d, projectID=%d}", t.AccountID, t.ProjectID) +} + +// Marshal appends marshaled t to dst and returns the result. +func (t *TenantToken) Marshal(dst []byte) []byte { + dst = encoding.MarshalUint32(dst, t.AccountID) + dst = encoding.MarshalUint32(dst, t.ProjectID) + return dst +} + +// NewMultiTenantSearchQuery creates new search query for the given args. +func NewMultiTenantSearchQuery(tenants []TenantToken, start, end int64, tagFilterss [][]TagFilter, maxMetrics int) *SearchQuery { + if start < 0 { + // This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5553 + start = 0 + } + if maxMetrics <= 0 { + maxMetrics = 2e9 + } + return &SearchQuery{ + TenantTokens: tenants, + MinTimestamp: start, + MaxTimestamp: end, + TagFilterss: tagFilterss, + MaxMetrics: maxMetrics, + IsMultiTenant: true, } } @@ -412,7 +456,15 @@ func (sq *SearchQuery) String() string { } start := TimestampToHumanReadableFormat(sq.MinTimestamp) end := TimestampToHumanReadableFormat(sq.MaxTimestamp) - return fmt.Sprintf("accountID=%d, projectID=%d, filters=%s, timeRange=[%s..%s]", sq.AccountID, sq.ProjectID, a, start, end) + if !sq.IsMultiTenant { + return fmt.Sprintf("accountID=%d, projectID=%d, filters=%s, timeRange=[%s..%s]", sq.AccountID, sq.ProjectID, a, start, end) + } + + tts := make([]string, len(sq.TenantTokens)) + for i, tt := range sq.TenantTokens { + tts[i] = tt.String() + } + return fmt.Sprintf("tenants=[%s], filters=%s, timeRange=[%s..%s]", strings.Join(tts, ","), a, start, end) } func tagFiltersToString(tfs []TagFilter) string { @@ -423,10 +475,9 @@ func tagFiltersToString(tfs []TagFilter) string { return "{" + strings.Join(a, ",") + "}" } -// Marshal appends marshaled sq to dst and returns the result. -func (sq *SearchQuery) Marshal(dst []byte) []byte { - dst = encoding.MarshalUint32(dst, sq.AccountID) - dst = encoding.MarshalUint32(dst, sq.ProjectID) +// MarshaWithoutTenant appends marshaled sq without AccountID/ProjectID to dst and returns the result. +// It is expected that TenantToken is already marshaled to dst. +func (sq *SearchQuery) MarshaWithoutTenant(dst []byte) []byte { dst = encoding.MarshalVarInt64(dst, sq.MinTimestamp) dst = encoding.MarshalVarInt64(dst, sq.MaxTimestamp) dst = encoding.MarshalVarUint64(dst, uint64(len(sq.TagFilterss))) diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go index 7dfa70740..29b371a61 100644 --- a/lib/storage/search_test.go +++ b/lib/storage/search_test.go @@ -29,7 +29,12 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) { // Skip nil sq1. continue } - buf = sq1.Marshal(buf[:0]) + tt := TenantToken{ + AccountID: sq1.AccountID, + ProjectID: sq1.ProjectID, + } + buf = tt.Marshal(buf[:0]) + buf = sq1.MarshaWithoutTenant(buf) tail, err := sq2.Unmarshal(buf) if err != nil { diff --git a/lib/tenantmetrics/counter_map.go b/lib/tenantmetrics/counter_map.go index cc47c423c..9b8759779 100644 --- a/lib/tenantmetrics/counter_map.go +++ b/lib/tenantmetrics/counter_map.go @@ -4,9 +4,10 @@ import ( "fmt" "sync/atomic" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/metrics" ) // TenantID defines metric tenant. @@ -21,6 +22,8 @@ type CounterMap struct { // do not use atomic.Pointer, since the stored map there is already a pointer type. m atomic.Value + // mt holds value for multi-tenant metrics. + mt atomic.Value } // NewCounterMap creates new CounterMap for the given metric. @@ -34,11 +37,15 @@ func NewCounterMap(metric string) *CounterMap { // Get returns counter for the given at func (cm *CounterMap) Get(at *auth.Token) *metrics.Counter { + if at == nil { + return cm.GetByTenant(nil) + } + key := TenantID{ AccountID: at.AccountID, ProjectID: at.ProjectID, } - return cm.GetByTenant(key) + return cm.GetByTenant(&key) } // MultiAdd adds multiple values grouped by auth.Token @@ -49,9 +56,19 @@ func (cm *CounterMap) MultiAdd(perTenantValues map[auth.Token]int) { } // GetByTenant returns counter for the given key. -func (cm *CounterMap) GetByTenant(key TenantID) *metrics.Counter { +func (cm *CounterMap) GetByTenant(key *TenantID) *metrics.Counter { + if key == nil { + mtm := cm.mt.Load() + if mtm == nil { + mtc := metrics.GetOrCreateCounter(createMetricNameMultitenant(cm.metric)) + cm.mt.Store(mtc) + return mtc + } + return mtm.(*metrics.Counter) + } + m := cm.m.Load().(map[TenantID]*metrics.Counter) - if c := m[key]; c != nil { + if c := m[*key]; c != nil { // Fast path - the counter for k already exists. return c } @@ -61,9 +78,9 @@ func (cm *CounterMap) GetByTenant(key TenantID) *metrics.Counter { for k, c := range m { newM[k] = c } - metricName := createMetricName(cm.metric, key) + metricName := createMetricName(cm.metric, *key) c := metrics.GetOrCreateCounter(metricName) - newM[key] = c + newM[*key] = c cm.m.Store(newM) return c } @@ -79,3 +96,15 @@ func createMetricName(metric string, key TenantID) string { // Metric with labels. return fmt.Sprintf(`%s,accountID="%d",projectID="%d"}`, metric[:len(metric)-1], key.AccountID, key.ProjectID) } + +func createMetricNameMultitenant(metric string) string { + if len(metric) == 0 { + logger.Panicf("BUG: metric cannot be empty") + } + if metric[len(metric)-1] != '}' { + // Metric without labels. + return fmt.Sprintf(`%s{accountID="multitenant",projectID="multitenant"}`, metric) + } + // Metric with labels. + return fmt.Sprintf(`%s,accountID="multitenant",projectID="multitenant"}`, metric[:len(metric)-1]) +} diff --git a/lib/timeutil/timeutil.go b/lib/timeutil/timeutil.go index 919bd0f46..d5df1a070 100644 --- a/lib/timeutil/timeutil.go +++ b/lib/timeutil/timeutil.go @@ -17,3 +17,15 @@ func AddJitterToDuration(d time.Duration) time.Duration { p := float64(fastrand.Uint32()) / (1 << 32) return d + time.Duration(p*float64(dv)) } + +// StartOfDay returns the start of the day for the given timestamp. +// Timestamp is in milliseconds. +func StartOfDay(ts int64) int64 { + return ts - (ts % 86400000) +} + +// EndOfDay returns the end of the day for the given timestamp. +// Timestamp is in milliseconds. +func EndOfDay(ts int64) int64 { + return StartOfDay(ts) + 86400000 - 1 +} diff --git a/lib/timeutil/timeutil_test.go b/lib/timeutil/timeutil_test.go index 55032fb35..8cb428f0f 100644 --- a/lib/timeutil/timeutil_test.go +++ b/lib/timeutil/timeutil_test.go @@ -25,3 +25,45 @@ func TestAddJitterToDuration(t *testing.T) { f(time.Hour) f(24 * time.Hour) } + +func TestStartOfDay(t *testing.T) { + f := func(original, expected time.Time) { + t.Helper() + + result := StartOfDay(original.UnixMilli()) + if result != expected.UnixMilli() { + t.Fatalf("unexpected result; got %d; want %d", result, expected.UnixMilli()) + } + } + + f( + time.Date(2021, 1, 1, 1, 1, 1, 0, time.UTC), + time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + ) + + f( + time.Date(2021, 1, 1, 23, 59, 59, 999999999, time.UTC), + time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + ) +} + +func TestEndOfDay(t *testing.T) { + f := func(original, expected time.Time) { + t.Helper() + + result := EndOfDay(original.UnixMilli()) + if result != expected.UnixMilli() { + t.Fatalf("unexpected result; got %d; want %d", result, expected.UnixMilli()) + } + } + + f( + time.Date(2021, 1, 1, 1, 1, 1, 0, time.UTC), + time.Date(2021, 1, 1, 23, 59, 59, 999999999, time.UTC), + ) + + f( + time.Date(2021, 1, 1, 23, 59, 59, 999999999, time.UTC), + time.Date(2021, 1, 1, 23, 59, 59, 999999999, time.UTC), + ) +}