From be87be34a4f6f681b0d64ddff2d873b392b07744 Mon Sep 17 00:00:00 2001 From: Nikolay Date: Wed, 12 May 2021 15:18:45 +0300 Subject: [PATCH] Adds tsdb match filters (#1282) * init work on filters * init propose for status filters * fixes tsdb status adds test * fix bug * removes checks from test --- README.md | 4 +- app/vmselect/netstorage/netstorage.go | 111 +++++++++++++++++ app/vmselect/prometheus/prometheus.go | 42 ++++++- app/vmstorage/transport/server.go | 47 +++++++ docs/CHANGELOG.md | 1 + docs/Cluster-VictoriaMetrics.md | 4 +- docs/Single-server-VictoriaMetrics.md | 19 +-- lib/storage/index_db.go | 173 +++++++++++++++++++++++--- lib/storage/index_db_test.go | 22 ++++ lib/storage/storage.go | 5 + 10 files changed, 397 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 8f970ab71..51ba6b034 100644 --- a/README.md +++ b/README.md @@ -197,9 +197,7 @@ It is recommended setting up alerts in [vmalert](https://docs.victoriametrics.co - `api/v1/export/native` - exports raw data in native binary format. It may be imported into another VictoriaMetrics via `api/v1/import/native` (see above). - `api/v1/export/csv` - exports data in CSV. It may be imported into another VictoriaMetrics via `api/v1/import/csv` (see above). - `api/v1/series/count` - returns the total number of series. - - `api/v1/status/tsdb` - for time series stats. See [these docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats) for details. - VictoriaMetrics accepts optional `topN=N` and `date=YYYY-MM-DD` query args for this handler, where `N` is the number of top entries to return in the response - and `YYYY-MM-DD` is the date for collecting the stats. By default the stats is collected for the current day. + - `api/v1/status/tsdb` - for time series stats. See [these docs](https://docs.victoriametrics.com/#tsdb-stats) for details. - `api/v1/status/active_queries` - for currently executed active queries. Note that every `vmselect` maintains an independent list of active queries, which is returned in the response. - `api/v1/status/top_queries` - for listing the most frequently executed queries and queries taking the most duration. diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 730659eab..fa751f566 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -1047,6 +1047,50 @@ func toTopHeapEntries(m map[string]uint64, topN int) []storage.TopHeapEntry { return a } +// GetTSDBStatusWithFilters returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats +// +// It accepts aribtrary filters on time series in sq. +func GetTSDBStatusWithFilters(at *auth.Token, denyPartialResponse bool, deadline searchutils.Deadline, sq *storage.SearchQuery, topN int) (*storage.TSDBStatus, bool, error) { + if deadline.Exceeded() { + return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } + requestData := sq.Marshal(nil) + // Send the query to all the storage nodes in parallel. + type nodeResult struct { + status *storage.TSDBStatus + err error + } + snr := startStorageNodesRequest(denyPartialResponse, func(idx int, sn *storageNode) interface{} { + sn.tsdbStatusWithFiltersRequests.Inc() + status, err := sn.getTSDBStatusWithFilters(requestData, topN, deadline) + if err != nil { + sn.tsdbStatusWithFiltersErrors.Inc() + err = fmt.Errorf("cannot obtain tsdb status with filters from vmstorage %s: %w", sn.connPool.Addr(), err) + } + return &nodeResult{ + status: status, + err: err, + } + }) + + // Collect results. + var statuses []*storage.TSDBStatus + isPartial, err := snr.collectResults(partialTSDBStatusResults, func(result interface{}) error { + nr := result.(*nodeResult) + if nr.err != nil { + return nr.err + } + statuses = append(statuses, nr.status) + return nil + }) + if err != nil { + return nil, isPartial, fmt.Errorf("cannot fetch tsdb status with filters from vmstorage nodes: %w", err) + } + + status := mergeTSDBStatuses(statuses, topN) + return status, isPartial, nil +} + // GetSeriesCount returns the number of unique series for the given at. func GetSeriesCount(at *auth.Token, denyPartialResponse bool, deadline searchutils.Deadline) (uint64, bool, error) { if deadline.Exceeded() { @@ -1462,6 +1506,12 @@ type storageNode struct { // The number of errors during requests to tsdb status. tsdbStatusErrors *metrics.Counter + // The number of requests to tsdb status. + tsdbStatusWithFiltersRequests *metrics.Counter + + // The number of errors during requests to tsdb status. + tsdbStatusWithFiltersErrors *metrics.Counter + // The number of requests to seriesCount. seriesCountRequests *metrics.Counter @@ -1626,6 +1676,22 @@ func (sn *storageNode) getTSDBStatusForDate(accountID, projectID uint32, date ui return status, nil } +func (sn *storageNode) getTSDBStatusWithFilters(requestData []byte, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, error) { + var status *storage.TSDBStatus + f := func(bc *handshake.BufferedConn) error { + st, err := sn.getTSDBStatusWithFiltersOnConn(bc, requestData, topN) + if err != nil { + return err + } + status = st + return nil + } + if err := sn.execOnConnWithPossibleRetry("tsdbStatusWithFilters_v1", f, deadline); err != nil { + return nil, err + } + return status, nil +} + func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline searchutils.Deadline) (uint64, error) { var n uint64 f := func(bc *handshake.BufferedConn) error { @@ -2108,6 +2174,49 @@ func (sn *storageNode) getTSDBStatusForDateOnConn(bc *handshake.BufferedConn, ac return status, nil } +func (sn *storageNode) getTSDBStatusWithFiltersOnConn(bc *handshake.BufferedConn, requestData []byte, topN int) (*storage.TSDBStatus, error) { + // Send the request to sn. + if err := writeBytes(bc, requestData); err != nil { + return nil, fmt.Errorf("cannot write requestData: %w", err) + } + // topN shouldn't exceed 32 bits, so send it as uint32. + if err := writeUint32(bc, uint32(topN)); err != nil { + return nil, fmt.Errorf("cannot send topN=%d to conn: %w", topN, err) + } + if err := bc.Flush(); err != nil { + return nil, fmt.Errorf("cannot flush tsdbStatusWithFilters args to conn: %w", err) + } + + // Read response error. + buf, err := readBytes(nil, bc, maxErrorMessageSize) + if err != nil { + return nil, fmt.Errorf("cannot read error message: %w", err) + } + if len(buf) > 0 { + return nil, newErrRemote(buf) + } + + // Read response + seriesCountByMetricName, err := readTopHeapEntries(bc) + if err != nil { + return nil, fmt.Errorf("cannot read seriesCountByMetricName: %w", err) + } + labelValueCountByLabelName, err := readTopHeapEntries(bc) + if err != nil { + return nil, fmt.Errorf("cannot read labelValueCountByLabelName: %w", err) + } + seriesCountByLabelValuePair, err := readTopHeapEntries(bc) + if err != nil { + return nil, fmt.Errorf("cannot read seriesCountByLabelValuePair: %w", err) + } + status := &storage.TSDBStatus{ + SeriesCountByMetricName: seriesCountByMetricName, + LabelValueCountByLabelName: labelValueCountByLabelName, + SeriesCountByLabelValuePair: seriesCountByLabelValuePair, + } + return status, nil +} + func readTopHeapEntries(bc *handshake.BufferedConn) ([]storage.TopHeapEntry, error) { n, err := readUint64(bc) if err != nil { @@ -2368,6 +2477,8 @@ func InitStorageNodes(addrs []string) { tagValueSuffixesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)), tsdbStatusRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)), tsdbStatusErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)), + tsdbStatusWithFiltersRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatusWithFilters", type="rpcClient", name="vmselect", addr=%q}`, addr)), + tsdbStatusWithFiltersErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatusWithFilters", type="rpcClient", name="vmselect", addr=%q}`, addr)), seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)), seriesCountErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)), searchMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)), diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index d6e331d15..606104ffe 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -691,11 +691,19 @@ const secsPerDay = 3600 * 24 // TSDBStatusHandler processes /api/v1/status/tsdb request. // // See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats +// +// It can accept `match[]` filters in order to narrow down the search. func TSDBStatusHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { deadline := searchutils.GetDeadlineForStatusRequest(r, startTime) if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse form values: %w", err) } + etf, err := searchutils.GetEnforcedTagFiltersFromRequest(r) + if err != nil { + return err + } + matches := getMatchesFromRequest(r) + date := fasttime.UnixDate() dateStr := r.FormValue("date") if len(dateStr) > 0 { @@ -721,9 +729,18 @@ func TSDBStatusHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite topN = n } denyPartialResponse := searchutils.GetDenyPartialResponse(r) - status, isPartial, err := netstorage.GetTSDBStatusForDate(at, denyPartialResponse, deadline, date, topN) - if err != nil { - return fmt.Errorf(`cannot obtain tsdb status for date=%d, topN=%d: %w`, date, topN, err) + var status *storage.TSDBStatus + var isPartial bool + if len(matches) == 0 && len(etf) == 0 { + status, isPartial, err = netstorage.GetTSDBStatusForDate(at, denyPartialResponse, deadline, date, topN) + if err != nil { + return fmt.Errorf(`cannot obtain tsdb status for date=%d, topN=%d: %w`, date, topN, err) + } + } else { + status, isPartial, err = tsdbStatusWithMatches(at, denyPartialResponse, matches, etf, date, topN, deadline) + if err != nil { + return fmt.Errorf("cannot obtain tsdb status with matches for date=%d, topN=%d: %w", date, topN, err) + } } w.Header().Set("Content-Type", "application/json; charset=utf-8") @@ -737,6 +754,25 @@ func TSDBStatusHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite return nil } +func tsdbStatusWithMatches(at *auth.Token, denyPartialResponse bool, matches []string, etf []storage.TagFilter, date uint64, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, bool, error) { + tagFilterss, err := getTagFilterssFromMatches(matches) + if err != nil { + return nil, false, err + } + tagFilterss = addEnforcedFiltersToTagFilterss(tagFilterss, etf) + if len(tagFilterss) == 0 { + logger.Panicf("BUG: tagFilterss must be non-empty") + } + start := int64(date*secsPerDay) * 1000 + end := int64(date*secsPerDay+secsPerDay) * 1000 + sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss) + status, isPartial, err := netstorage.GetTSDBStatusWithFilters(at, denyPartialResponse, deadline, sq, topN) + if err != nil { + return nil, false, err + } + return status, isPartial, nil +} + var tsdbStatusDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/tsdb"}`) // LabelsHandler processes /api/v1/labels request. diff --git a/app/vmstorage/transport/server.go b/app/vmstorage/transport/server.go index 3b9d5cdec..4dae825e8 100644 --- a/app/vmstorage/transport/server.go +++ b/app/vmstorage/transport/server.go @@ -514,6 +514,8 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error { return s.processVMSelectSeriesCount(ctx) case "tsdbStatus_v2": return s.processVMSelectTSDBStatus(ctx) + case "tsdbStatusWithFilters_v1": + return s.processVMSelectTSDBStatusWithFilters(ctx) case "deleteMetrics_v3": return s.processVMSelectDeleteMetrics(ctx) case "registerMetricNames_v1": @@ -930,6 +932,50 @@ func (s *Server) processVMSelectTSDBStatus(ctx *vmselectRequestCtx) error { return nil } +func (s *Server) processVMSelectTSDBStatusWithFilters(ctx *vmselectRequestCtx) error { + vmselectTSDBStatusWithFiltersRequests.Inc() + + // Read request + if err := ctx.readSearchQuery(); err != nil { + return err + } + topN, err := ctx.readUint32() + if err != nil { + return fmt.Errorf("cannot read topN: %w", err) + } + + // Execute the request + tr := storage.TimeRange{ + MinTimestamp: ctx.sq.MinTimestamp, + MaxTimestamp: ctx.sq.MaxTimestamp, + } + if err := ctx.setupTfss(s.storage, tr); err != nil { + return ctx.writeErrorMessage(err) + } + date := uint64(ctx.sq.MinTimestamp) / (24 * 3600 * 1000) + status, err := s.storage.GetTSDBStatusWithFiltersForDate(ctx.sq.AccountID, ctx.sq.ProjectID, ctx.tfss, date, int(topN), ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + + // Send an empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send status to vmselect. + if err := writeTopHeapEntries(ctx, status.SeriesCountByMetricName); err != nil { + return fmt.Errorf("cannot write seriesCountByMetricName to vmselect: %w", err) + } + if err := writeTopHeapEntries(ctx, status.LabelValueCountByLabelName); err != nil { + return fmt.Errorf("cannot write labelValueCountByLabelName to vmselect: %w", err) + } + if err := writeTopHeapEntries(ctx, status.SeriesCountByLabelValuePair); err != nil { + return fmt.Errorf("cannot write seriesCountByLabelValuePair to vmselect: %w", err) + } + return nil +} + func writeTopHeapEntries(ctx *vmselectRequestCtx, a []storage.TopHeapEntry) error { if err := ctx.writeUint64(uint64(len(a))); err != nil { return fmt.Errorf("cannot write topHeapEntries size: %w", err) @@ -1078,6 +1124,7 @@ var ( vmselectLabelEntriesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="label_entries"}`) vmselectSeriesCountRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="series_count"}`) vmselectTSDBStatusRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="tsdb_status"}`) + vmselectTSDBStatusWithFiltersRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="tsdb_status_with_filters"}`) vmselectSearchMetricNamesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="search_metric_names"}`) vmselectSearchRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="search"}`) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2b5e43186..f21ccd808 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -9,6 +9,7 @@ sort: 15 * FEATURE: vmalert: add ability to pass `round_digits` query arg to datasource via `-datasource.roundDigits` command-line flag. This can be used for limiting the number of decimal digits after the point in recording rule results. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/525). * FEATURE: return `X-Server-Hostname` header in http responses of all the VictoriaMetrics components. This should simplify tracing the origin server behind a load balancer or behind auth proxy during troubleshooting. * FEATURE: vmselect: allow to use 2x more memory for query processing at `vmselect` nodes in [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html). This should allow processing heavy queries without the need to increase RAM size at `vmselect` nodes. +* FEATURE: add ability to filter `/api/v1/status/tsdb` output with arbitrary [time series selectors](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) passed via `match[]` query args. See [these docs](https://docs.victoriametrics.com/#tsdb-stats) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1168) for details. * BUGFIX: vmagent: fix possible race when refreshing `role: endpoints` and `role: endpointslices` scrape targets in `kubernetes_sd_config`. Prevoiusly `pod` objects could be updated after the related `endpoints` object update. This could lead to missing scrape targets. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240). * BUGFIX: properly remove stale parts outside the configured retention if `-retentionPeriod` is smaller than one month. Previously stale parts could remain active for up to a month after they go outside the retention. diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index ec971cad2..128cc06a1 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -201,9 +201,7 @@ It is recommended setting up alerts in [vmalert](https://docs.victoriametrics.co - `api/v1/export/native` - exports raw data in native binary format. It may be imported into another VictoriaMetrics via `api/v1/import/native` (see above). - `api/v1/export/csv` - exports data in CSV. It may be imported into another VictoriaMetrics via `api/v1/import/csv` (see above). - `api/v1/series/count` - returns the total number of series. - - `api/v1/status/tsdb` - for time series stats. See [these docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats) for details. - VictoriaMetrics accepts optional `topN=N` and `date=YYYY-MM-DD` query args for this handler, where `N` is the number of top entries to return in the response - and `YYYY-MM-DD` is the date for collecting the stats. By default the stats is collected for the current day. + - `api/v1/status/tsdb` - for time series stats. See [these docs](https://docs.victoriametrics.com/#tsdb-stats) for details. - `api/v1/status/active_queries` - for currently executed active queries. Note that every `vmselect` maintains an independent list of active queries, which is returned in the response. - `api/v1/status/top_queries` - for listing the most frequently executed queries and queries taking the most duration. diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 64657442e..93ce45acc 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -553,9 +553,7 @@ VictoriaMetrics supports the following handlers from [Prometheus querying API](h * [/api/v1/series](https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers) * [/api/v1/labels](https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names) * [/api/v1/label/.../values](https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values) -* [/api/v1/status/tsdb](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats). VictoriaMetrics accepts optional `topN=N` and `date=YYYY-MM-DD` - query args for this handler, where `N` is the number of top entries to return in the response and `YYYY-MM-DD` is the date for collecting the stats. - By default top 10 entries are returned and the stats is collected for the current day. +* [/api/v1/status/tsdb](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats). See [these docs](#tsdb-stats) for details. * [/api/v1/targets](https://prometheus.io/docs/prometheus/latest/querying/api/#targets) - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter) for more details. These handlers can be queried from Prometheus-compatible clients such as Grafana or curl. @@ -1328,6 +1326,16 @@ VictoriaMetrics also exposes currently running queries with their execution time See the example of alerting rules for VM components [here](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts.yml). + +## TSDB stats + +VictoriaMetrics retuns TSDB stats at `/api/v1/status/tsdb` page in the way similar to Prometheus - see [these Prometheus docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats). VictoriaMetrics accepts the following optional query args at `/api/v1/status/tsdb` page: + * `topN=N` where `N` is the number of top entries to return in the response. By default top 10 entries are returned. + * `date=YYYY-MM-DD` where `YYYY-MM-DD` is the date for collecting the stats. By default the stats is collected for the current day. + * `match[]=SELECTOR` where `SELECTOR` is an arbitrary [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) for series to take into account during stats calculation. By default all the series are taken into account. + * `extra_label=LABEL=VALUE`. See [these docs](#prometheus-querying-api-enhancements) for more details. + + ## Troubleshooting * It is recommended to use default command-line flag values (i.e. don't set them explicitly) until the need @@ -1384,10 +1392,7 @@ See the example of alerting rules for VM components [here](https://github.com/Vi It may be needed in order to suppress default gap filling algorithm used by VictoriaMetrics - by default it assumes each time series is continuous instead of discrete, so it fills gaps between real samples with regular intervals. -* Metrics and labels leading to high cardinality or high churn rate can be determined at `/api/v1/status/tsdb` page. - See [these docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats) for details. - VictoriaMetrics accepts optional `date=YYYY-MM-DD` and `topN=42` args on this page. By default `date` equals to the current date, - while `topN` equals to 10. +* Metrics and labels leading to high cardinality or high churn rate can be determined at `/api/v1/status/tsdb` page. See [these docs](#tsdb-stats) for details. * New time series can be logged if `-logNewSeries` command-line flag is passed to VictoriaMetrics. diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 7ffdda757..c4dde4067 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -1332,6 +1332,140 @@ func (is *indexSearch) getSeriesCount() (uint64, error) { return metricIDsLen, nil } +// GetTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss, date, accountID and projectID. +func (db *indexDB) GetTSDBStatusWithFiltersForDate(accountID, projectID uint32, tfss []*TagFilters, date uint64, topN int, deadline uint64) (*TSDBStatus, error) { + is := db.getIndexSearch(accountID, projectID, deadline) + status, err := is.getTSDBStatusWithFiltersForDate(tfss, date, topN, deadline) + db.putIndexSearch(is) + if err != nil { + return nil, err + } + if status.hasEntries() { + return status, nil + } + ok := db.doExtDB(func(extDB *indexDB) { + is := extDB.getIndexSearch(accountID, projectID, deadline) + status, err = is.getTSDBStatusWithFiltersForDate(tfss, date, topN, deadline) + extDB.putIndexSearch(is) + }) + if ok && err != nil { + return nil, fmt.Errorf("error when obtaining TSDB status from extDB: %w", err) + } + return status, nil +} + +// getTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss and the given date. +func (is *indexSearch) getTSDBStatusWithFiltersForDate(tfss []*TagFilters, date uint64, topN int, deadline uint64) (*TSDBStatus, error) { + tr := TimeRange{ + MinTimestamp: int64(date) * msecPerDay, + MaxTimestamp: int64(date+1) * msecPerDay, + } + metricIDs, err := is.searchMetricIDsInternal(tfss, tr, 2e9) + if err != nil { + return nil, err + } + if metricIDs.Len() == 0 { + // Nothing found. + return &TSDBStatus{}, nil + } + + // The code below must be in sync with getTSDBStatusForDate + ts := &is.ts + kb := &is.kb + mp := &is.mp + thLabelValueCountByLabelName := newTopHeap(topN) + thSeriesCountByLabelValuePair := newTopHeap(topN) + thSeriesCountByMetricName := newTopHeap(topN) + var tmp, labelName, labelNameValue []byte + var labelValueCountByLabelName, seriesCountByLabelValuePair uint64 + nameEqualBytes := []byte("__name__=") + + loopsPaceLimiter := 0 + kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + prefix := kb.B + ts.Seek(prefix) + for ts.NextItem() { + if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 { + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return nil, err + } + } + loopsPaceLimiter++ + item := ts.Item + if !bytes.HasPrefix(item, prefix) { + break + } + if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil { + return nil, err + } + mp.ParseMetricIDs() + matchingSeriesCount := 0 + for _, metricID := range mp.MetricIDs { + if metricIDs.Has(metricID) { + matchingSeriesCount++ + } + } + if matchingSeriesCount == 0 { + // Skip rows without matching metricIDs. + continue + } + tail := item[len(prefix):] + var err error + tail, tmp, err = unmarshalTagValue(tmp[:0], tail) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal tag key from line %q: %w", item, err) + } + if isArtificialTagKey(tmp) { + // Skip artificially created tag keys. + continue + } + if len(tmp) == 0 { + tmp = append(tmp, "__name__"...) + } + if !bytes.Equal(tmp, labelName) { + thLabelValueCountByLabelName.pushIfNonEmpty(labelName, labelValueCountByLabelName) + labelValueCountByLabelName = 0 + labelName = append(labelName[:0], tmp...) + } + tmp = append(tmp, '=') + tail, tmp, err = unmarshalTagValue(tmp, tail) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal tag value from line %q: %w", item, err) + } + if !bytes.Equal(tmp, labelNameValue) { + thSeriesCountByLabelValuePair.pushIfNonEmpty(labelNameValue, seriesCountByLabelValuePair) + if bytes.HasPrefix(labelNameValue, nameEqualBytes) { + thSeriesCountByMetricName.pushIfNonEmpty(labelNameValue[len(nameEqualBytes):], seriesCountByLabelValuePair) + } + seriesCountByLabelValuePair = 0 + labelValueCountByLabelName++ + labelNameValue = append(labelNameValue[:0], tmp...) + } + if err := mp.InitOnlyTail(item, tail); err != nil { + return nil, err + } + // Take into account deleted timeseries too. + // It is OK if series can be counted multiple times in rare cases - + // the returned number is an estimation. + seriesCountByLabelValuePair += uint64(matchingSeriesCount) + } + if err := ts.Error(); err != nil { + return nil, fmt.Errorf("error when counting time series by metric names: %w", err) + } + thLabelValueCountByLabelName.pushIfNonEmpty(labelName, labelValueCountByLabelName) + thSeriesCountByLabelValuePair.pushIfNonEmpty(labelNameValue, seriesCountByLabelValuePair) + if bytes.HasPrefix(labelNameValue, nameEqualBytes) { + thSeriesCountByMetricName.pushIfNonEmpty(labelNameValue[len(nameEqualBytes):], seriesCountByLabelValuePair) + } + status := &TSDBStatus{ + SeriesCountByMetricName: thSeriesCountByMetricName.getSortedResult(), + LabelValueCountByLabelName: thLabelValueCountByLabelName.getSortedResult(), + SeriesCountByLabelValuePair: thSeriesCountByLabelValuePair.getSortedResult(), + } + return status, nil +} + // GetTSDBStatusForDate returns topN entries for tsdb status for the given date, accountID and projectID. func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int, deadline uint64) (*TSDBStatus, error) { is := db.getIndexSearch(accountID, projectID, deadline) @@ -1358,6 +1492,7 @@ func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64 } func (is *indexSearch) getTSDBStatusForDate(date uint64, topN int) (*TSDBStatus, error) { + // The code below must be in sync with getTSDBStatusWithFiltersForDate ts := &is.ts kb := &is.kb mp := &is.mp @@ -2315,21 +2450,9 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) } func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) { - metricIDs := &uint64set.Set{} - for _, tfs := range tfss { - if len(tfs.tfs) == 0 { - // An empty filters must be equivalent to `{__name__!=""}` - tfs = NewTagFilters(tfs.accountID, tfs.projectID) - if err := tfs.Add(nil, nil, true, false); err != nil { - logger.Panicf(`BUG: cannot add {__name__!=""} filter: %s`, err) - } - } - if err := is.updateMetricIDsForTagFilters(metricIDs, tfs, tr, maxMetrics+1); err != nil { - return nil, err - } - if metricIDs.Len() > maxMetrics { - return nil, fmt.Errorf("the number of matching unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics) - } + metricIDs, err := is.searchMetricIDsInternal(tfss, tr, maxMetrics) + if err != nil { + return nil, err } if metricIDs.Len() == 0 { // Nothing found @@ -2353,6 +2476,26 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr return sortedMetricIDs, nil } +func (is *indexSearch) searchMetricIDsInternal(tfss []*TagFilters, tr TimeRange, maxMetrics int) (*uint64set.Set, error) { + metricIDs := &uint64set.Set{} + for _, tfs := range tfss { + if len(tfs.tfs) == 0 { + // An empty filters must be equivalent to `{__name__!=""}` + tfs = NewTagFilters(tfs.accountID, tfs.projectID) + if err := tfs.Add(nil, nil, true, false); err != nil { + logger.Panicf(`BUG: cannot add {__name__!=""} filter: %s`, err) + } + } + if err := is.updateMetricIDsForTagFilters(metricIDs, tfs, tr, maxMetrics+1); err != nil { + return nil, err + } + if metricIDs.Len() > maxMetrics { + return nil, fmt.Errorf("the number of matching unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics) + } + } + return metricIDs, nil +} + func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error { err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics) if err == nil { diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 6c75e15a2..db6d1ff8d 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -1771,6 +1771,28 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { if !reflect.DeepEqual(status.SeriesCountByLabelValuePair, expectedSeriesCountByLabelValuePair) { t.Fatalf("unexpected SeriesCountByLabelValuePair;\ngot\n%v\nwant\n%v", status.SeriesCountByLabelValuePair, expectedSeriesCountByLabelValuePair) } + + // Check GetTSDBStatusWithFiltersForDate + tfs = NewTagFilters(accountID, projectID) + if err := tfs.Add([]byte("day"), []byte("0"), false, false); err != nil { + t.Fatalf("cannot add filter: %s", err) + } + status, err = db.GetTSDBStatusWithFiltersForDate(accountID, projectID, []*TagFilters{tfs}, baseDate, 5, noDeadline) + if err != nil { + t.Fatalf("error in GetTSDBStatusWithFiltersForDate: %s", err) + } + if !status.hasEntries() { + t.Fatalf("expecting non-empty TSDB status") + } + expectedSeriesCountByMetricName = []TopHeapEntry{ + { + Name: "testMetric", + Count: 1000, + }, + } + if !reflect.DeepEqual(status.SeriesCountByMetricName, expectedSeriesCountByMetricName) { + t.Fatalf("unexpected SeriesCountByMetricName;\ngot\n%v\nwant\n%v", status.SeriesCountByMetricName, expectedSeriesCountByMetricName) + } } func toTFPointers(tfs []tagFilter) []*tagFilter { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 776c7e309..8e72a1e6a 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1324,6 +1324,11 @@ func (s *Storage) GetTSDBStatusForDate(accountID, projectID uint32, date uint64, return s.idb().GetTSDBStatusForDate(accountID, projectID, date, topN, deadline) } +// GetTSDBStatusWithFiltersForDate returns TSDB status data for /api/v1/status/tsdb with match[] filters and the given (accountID, projectID). +func (s *Storage) GetTSDBStatusWithFiltersForDate(accountID, projectID uint32, tfss []*TagFilters, date uint64, topN int, deadline uint64) (*TSDBStatus, error) { + return s.idb().GetTSDBStatusWithFiltersForDate(accountID, projectID, tfss, date, topN, deadline) +} + // MetricRow is a metric to insert into storage. type MetricRow struct { // MetricNameRaw contains raw metric name, which must be decoded