Adds tsdb match filters (#1282)

* init work on filters

* init propose for status filters

* fixes tsdb status
adds test

* fix bug

* removes checks from test
This commit is contained in:
Nikolay 2021-05-12 15:18:45 +03:00 committed by Aliaksandr Valialkin
parent 56b08390f6
commit be87be34a4
10 changed files with 397 additions and 31 deletions

View file

@ -197,9 +197,7 @@ It is recommended setting up alerts in [vmalert](https://docs.victoriametrics.co
- `api/v1/export/native` - exports raw data in native binary format. It may be imported into another VictoriaMetrics via `api/v1/import/native` (see above).
- `api/v1/export/csv` - exports data in CSV. It may be imported into another VictoriaMetrics via `api/v1/import/csv` (see above).
- `api/v1/series/count` - returns the total number of series.
- `api/v1/status/tsdb` - for time series stats. See [these docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats) for details.
VictoriaMetrics accepts optional `topN=N` and `date=YYYY-MM-DD` query args for this handler, where `N` is the number of top entries to return in the response
and `YYYY-MM-DD` is the date for collecting the stats. By default the stats is collected for the current day.
- `api/v1/status/tsdb` - for time series stats. See [these docs](https://docs.victoriametrics.com/#tsdb-stats) for details.
- `api/v1/status/active_queries` - for currently executed active queries. Note that every `vmselect` maintains an independent list of active queries,
which is returned in the response.
- `api/v1/status/top_queries` - for listing the most frequently executed queries and queries taking the most duration.

View file

@ -1047,6 +1047,50 @@ func toTopHeapEntries(m map[string]uint64, topN int) []storage.TopHeapEntry {
return a
}
// GetTSDBStatusWithFilters returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
//
// It accepts aribtrary filters on time series in sq.
func GetTSDBStatusWithFilters(at *auth.Token, denyPartialResponse bool, deadline searchutils.Deadline, sq *storage.SearchQuery, topN int) (*storage.TSDBStatus, bool, error) {
if deadline.Exceeded() {
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
requestData := sq.Marshal(nil)
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
status *storage.TSDBStatus
err error
}
snr := startStorageNodesRequest(denyPartialResponse, func(idx int, sn *storageNode) interface{} {
sn.tsdbStatusWithFiltersRequests.Inc()
status, err := sn.getTSDBStatusWithFilters(requestData, topN, deadline)
if err != nil {
sn.tsdbStatusWithFiltersErrors.Inc()
err = fmt.Errorf("cannot obtain tsdb status with filters from vmstorage %s: %w", sn.connPool.Addr(), err)
}
return &nodeResult{
status: status,
err: err,
}
})
// Collect results.
var statuses []*storage.TSDBStatus
isPartial, err := snr.collectResults(partialTSDBStatusResults, func(result interface{}) error {
nr := result.(*nodeResult)
if nr.err != nil {
return nr.err
}
statuses = append(statuses, nr.status)
return nil
})
if err != nil {
return nil, isPartial, fmt.Errorf("cannot fetch tsdb status with filters from vmstorage nodes: %w", err)
}
status := mergeTSDBStatuses(statuses, topN)
return status, isPartial, nil
}
// GetSeriesCount returns the number of unique series for the given at.
func GetSeriesCount(at *auth.Token, denyPartialResponse bool, deadline searchutils.Deadline) (uint64, bool, error) {
if deadline.Exceeded() {
@ -1462,6 +1506,12 @@ type storageNode struct {
// The number of errors during requests to tsdb status.
tsdbStatusErrors *metrics.Counter
// The number of requests to tsdb status.
tsdbStatusWithFiltersRequests *metrics.Counter
// The number of errors during requests to tsdb status.
tsdbStatusWithFiltersErrors *metrics.Counter
// The number of requests to seriesCount.
seriesCountRequests *metrics.Counter
@ -1626,6 +1676,22 @@ func (sn *storageNode) getTSDBStatusForDate(accountID, projectID uint32, date ui
return status, nil
}
func (sn *storageNode) getTSDBStatusWithFilters(requestData []byte, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, error) {
var status *storage.TSDBStatus
f := func(bc *handshake.BufferedConn) error {
st, err := sn.getTSDBStatusWithFiltersOnConn(bc, requestData, topN)
if err != nil {
return err
}
status = st
return nil
}
if err := sn.execOnConnWithPossibleRetry("tsdbStatusWithFilters_v1", f, deadline); err != nil {
return nil, err
}
return status, nil
}
func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline searchutils.Deadline) (uint64, error) {
var n uint64
f := func(bc *handshake.BufferedConn) error {
@ -2108,6 +2174,49 @@ func (sn *storageNode) getTSDBStatusForDateOnConn(bc *handshake.BufferedConn, ac
return status, nil
}
func (sn *storageNode) getTSDBStatusWithFiltersOnConn(bc *handshake.BufferedConn, requestData []byte, topN int) (*storage.TSDBStatus, error) {
// Send the request to sn.
if err := writeBytes(bc, requestData); err != nil {
return nil, fmt.Errorf("cannot write requestData: %w", err)
}
// topN shouldn't exceed 32 bits, so send it as uint32.
if err := writeUint32(bc, uint32(topN)); err != nil {
return nil, fmt.Errorf("cannot send topN=%d to conn: %w", topN, err)
}
if err := bc.Flush(); err != nil {
return nil, fmt.Errorf("cannot flush tsdbStatusWithFilters args to conn: %w", err)
}
// Read response error.
buf, err := readBytes(nil, bc, maxErrorMessageSize)
if err != nil {
return nil, fmt.Errorf("cannot read error message: %w", err)
}
if len(buf) > 0 {
return nil, newErrRemote(buf)
}
// Read response
seriesCountByMetricName, err := readTopHeapEntries(bc)
if err != nil {
return nil, fmt.Errorf("cannot read seriesCountByMetricName: %w", err)
}
labelValueCountByLabelName, err := readTopHeapEntries(bc)
if err != nil {
return nil, fmt.Errorf("cannot read labelValueCountByLabelName: %w", err)
}
seriesCountByLabelValuePair, err := readTopHeapEntries(bc)
if err != nil {
return nil, fmt.Errorf("cannot read seriesCountByLabelValuePair: %w", err)
}
status := &storage.TSDBStatus{
SeriesCountByMetricName: seriesCountByMetricName,
LabelValueCountByLabelName: labelValueCountByLabelName,
SeriesCountByLabelValuePair: seriesCountByLabelValuePair,
}
return status, nil
}
func readTopHeapEntries(bc *handshake.BufferedConn) ([]storage.TopHeapEntry, error) {
n, err := readUint64(bc)
if err != nil {
@ -2368,6 +2477,8 @@ func InitStorageNodes(addrs []string) {
tagValueSuffixesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusWithFiltersRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatusWithFilters", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusWithFiltersErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatusWithFilters", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),

View file

@ -691,11 +691,19 @@ const secsPerDay = 3600 * 24
// TSDBStatusHandler processes /api/v1/status/tsdb request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
//
// It can accept `match[]` filters in order to narrow down the search.
func TSDBStatusHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
deadline := searchutils.GetDeadlineForStatusRequest(r, startTime)
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
etf, err := searchutils.GetEnforcedTagFiltersFromRequest(r)
if err != nil {
return err
}
matches := getMatchesFromRequest(r)
date := fasttime.UnixDate()
dateStr := r.FormValue("date")
if len(dateStr) > 0 {
@ -721,9 +729,18 @@ func TSDBStatusHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite
topN = n
}
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
status, isPartial, err := netstorage.GetTSDBStatusForDate(at, denyPartialResponse, deadline, date, topN)
if err != nil {
return fmt.Errorf(`cannot obtain tsdb status for date=%d, topN=%d: %w`, date, topN, err)
var status *storage.TSDBStatus
var isPartial bool
if len(matches) == 0 && len(etf) == 0 {
status, isPartial, err = netstorage.GetTSDBStatusForDate(at, denyPartialResponse, deadline, date, topN)
if err != nil {
return fmt.Errorf(`cannot obtain tsdb status for date=%d, topN=%d: %w`, date, topN, err)
}
} else {
status, isPartial, err = tsdbStatusWithMatches(at, denyPartialResponse, matches, etf, date, topN, deadline)
if err != nil {
return fmt.Errorf("cannot obtain tsdb status with matches for date=%d, topN=%d: %w", date, topN, err)
}
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
@ -737,6 +754,25 @@ func TSDBStatusHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite
return nil
}
func tsdbStatusWithMatches(at *auth.Token, denyPartialResponse bool, matches []string, etf []storage.TagFilter, date uint64, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, bool, error) {
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
return nil, false, err
}
tagFilterss = addEnforcedFiltersToTagFilterss(tagFilterss, etf)
if len(tagFilterss) == 0 {
logger.Panicf("BUG: tagFilterss must be non-empty")
}
start := int64(date*secsPerDay) * 1000
end := int64(date*secsPerDay+secsPerDay) * 1000
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss)
status, isPartial, err := netstorage.GetTSDBStatusWithFilters(at, denyPartialResponse, deadline, sq, topN)
if err != nil {
return nil, false, err
}
return status, isPartial, nil
}
var tsdbStatusDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/tsdb"}`)
// LabelsHandler processes /api/v1/labels request.

View file

@ -514,6 +514,8 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error {
return s.processVMSelectSeriesCount(ctx)
case "tsdbStatus_v2":
return s.processVMSelectTSDBStatus(ctx)
case "tsdbStatusWithFilters_v1":
return s.processVMSelectTSDBStatusWithFilters(ctx)
case "deleteMetrics_v3":
return s.processVMSelectDeleteMetrics(ctx)
case "registerMetricNames_v1":
@ -930,6 +932,50 @@ func (s *Server) processVMSelectTSDBStatus(ctx *vmselectRequestCtx) error {
return nil
}
func (s *Server) processVMSelectTSDBStatusWithFilters(ctx *vmselectRequestCtx) error {
vmselectTSDBStatusWithFiltersRequests.Inc()
// Read request
if err := ctx.readSearchQuery(); err != nil {
return err
}
topN, err := ctx.readUint32()
if err != nil {
return fmt.Errorf("cannot read topN: %w", err)
}
// Execute the request
tr := storage.TimeRange{
MinTimestamp: ctx.sq.MinTimestamp,
MaxTimestamp: ctx.sq.MaxTimestamp,
}
if err := ctx.setupTfss(s.storage, tr); err != nil {
return ctx.writeErrorMessage(err)
}
date := uint64(ctx.sq.MinTimestamp) / (24 * 3600 * 1000)
status, err := s.storage.GetTSDBStatusWithFiltersForDate(ctx.sq.AccountID, ctx.sq.ProjectID, ctx.tfss, date, int(topN), ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}
// Send an empty error message to vmselect.
if err := ctx.writeString(""); err != nil {
return fmt.Errorf("cannot send empty error message: %w", err)
}
// Send status to vmselect.
if err := writeTopHeapEntries(ctx, status.SeriesCountByMetricName); err != nil {
return fmt.Errorf("cannot write seriesCountByMetricName to vmselect: %w", err)
}
if err := writeTopHeapEntries(ctx, status.LabelValueCountByLabelName); err != nil {
return fmt.Errorf("cannot write labelValueCountByLabelName to vmselect: %w", err)
}
if err := writeTopHeapEntries(ctx, status.SeriesCountByLabelValuePair); err != nil {
return fmt.Errorf("cannot write seriesCountByLabelValuePair to vmselect: %w", err)
}
return nil
}
func writeTopHeapEntries(ctx *vmselectRequestCtx, a []storage.TopHeapEntry) error {
if err := ctx.writeUint64(uint64(len(a))); err != nil {
return fmt.Errorf("cannot write topHeapEntries size: %w", err)
@ -1078,6 +1124,7 @@ var (
vmselectLabelEntriesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="label_entries"}`)
vmselectSeriesCountRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="series_count"}`)
vmselectTSDBStatusRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="tsdb_status"}`)
vmselectTSDBStatusWithFiltersRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="tsdb_status_with_filters"}`)
vmselectSearchMetricNamesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="search_metric_names"}`)
vmselectSearchRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="search"}`)

View file

@ -9,6 +9,7 @@ sort: 15
* FEATURE: vmalert: add ability to pass `round_digits` query arg to datasource via `-datasource.roundDigits` command-line flag. This can be used for limiting the number of decimal digits after the point in recording rule results. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/525).
* FEATURE: return `X-Server-Hostname` header in http responses of all the VictoriaMetrics components. This should simplify tracing the origin server behind a load balancer or behind auth proxy during troubleshooting.
* FEATURE: vmselect: allow to use 2x more memory for query processing at `vmselect` nodes in [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html). This should allow processing heavy queries without the need to increase RAM size at `vmselect` nodes.
* FEATURE: add ability to filter `/api/v1/status/tsdb` output with arbitrary [time series selectors](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) passed via `match[]` query args. See [these docs](https://docs.victoriametrics.com/#tsdb-stats) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1168) for details.
* BUGFIX: vmagent: fix possible race when refreshing `role: endpoints` and `role: endpointslices` scrape targets in `kubernetes_sd_config`. Prevoiusly `pod` objects could be updated after the related `endpoints` object update. This could lead to missing scrape targets. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240).
* BUGFIX: properly remove stale parts outside the configured retention if `-retentionPeriod` is smaller than one month. Previously stale parts could remain active for up to a month after they go outside the retention.

View file

@ -201,9 +201,7 @@ It is recommended setting up alerts in [vmalert](https://docs.victoriametrics.co
- `api/v1/export/native` - exports raw data in native binary format. It may be imported into another VictoriaMetrics via `api/v1/import/native` (see above).
- `api/v1/export/csv` - exports data in CSV. It may be imported into another VictoriaMetrics via `api/v1/import/csv` (see above).
- `api/v1/series/count` - returns the total number of series.
- `api/v1/status/tsdb` - for time series stats. See [these docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats) for details.
VictoriaMetrics accepts optional `topN=N` and `date=YYYY-MM-DD` query args for this handler, where `N` is the number of top entries to return in the response
and `YYYY-MM-DD` is the date for collecting the stats. By default the stats is collected for the current day.
- `api/v1/status/tsdb` - for time series stats. See [these docs](https://docs.victoriametrics.com/#tsdb-stats) for details.
- `api/v1/status/active_queries` - for currently executed active queries. Note that every `vmselect` maintains an independent list of active queries,
which is returned in the response.
- `api/v1/status/top_queries` - for listing the most frequently executed queries and queries taking the most duration.

View file

@ -553,9 +553,7 @@ VictoriaMetrics supports the following handlers from [Prometheus querying API](h
* [/api/v1/series](https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers)
* [/api/v1/labels](https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names)
* [/api/v1/label/.../values](https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values)
* [/api/v1/status/tsdb](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats). VictoriaMetrics accepts optional `topN=N` and `date=YYYY-MM-DD`
query args for this handler, where `N` is the number of top entries to return in the response and `YYYY-MM-DD` is the date for collecting the stats.
By default top 10 entries are returned and the stats is collected for the current day.
* [/api/v1/status/tsdb](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats). See [these docs](#tsdb-stats) for details.
* [/api/v1/targets](https://prometheus.io/docs/prometheus/latest/querying/api/#targets) - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter) for more details.
These handlers can be queried from Prometheus-compatible clients such as Grafana or curl.
@ -1328,6 +1326,16 @@ VictoriaMetrics also exposes currently running queries with their execution time
See the example of alerting rules for VM components [here](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts.yml).
## TSDB stats
VictoriaMetrics retuns TSDB stats at `/api/v1/status/tsdb` page in the way similar to Prometheus - see [these Prometheus docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats). VictoriaMetrics accepts the following optional query args at `/api/v1/status/tsdb` page:
* `topN=N` where `N` is the number of top entries to return in the response. By default top 10 entries are returned.
* `date=YYYY-MM-DD` where `YYYY-MM-DD` is the date for collecting the stats. By default the stats is collected for the current day.
* `match[]=SELECTOR` where `SELECTOR` is an arbitrary [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) for series to take into account during stats calculation. By default all the series are taken into account.
* `extra_label=LABEL=VALUE`. See [these docs](#prometheus-querying-api-enhancements) for more details.
## Troubleshooting
* It is recommended to use default command-line flag values (i.e. don't set them explicitly) until the need
@ -1384,10 +1392,7 @@ See the example of alerting rules for VM components [here](https://github.com/Vi
It may be needed in order to suppress default gap filling algorithm used by VictoriaMetrics - by default it assumes
each time series is continuous instead of discrete, so it fills gaps between real samples with regular intervals.
* Metrics and labels leading to high cardinality or high churn rate can be determined at `/api/v1/status/tsdb` page.
See [these docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats) for details.
VictoriaMetrics accepts optional `date=YYYY-MM-DD` and `topN=42` args on this page. By default `date` equals to the current date,
while `topN` equals to 10.
* Metrics and labels leading to high cardinality or high churn rate can be determined at `/api/v1/status/tsdb` page. See [these docs](#tsdb-stats) for details.
* New time series can be logged if `-logNewSeries` command-line flag is passed to VictoriaMetrics.

View file

@ -1332,6 +1332,140 @@ func (is *indexSearch) getSeriesCount() (uint64, error) {
return metricIDsLen, nil
}
// GetTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss, date, accountID and projectID.
func (db *indexDB) GetTSDBStatusWithFiltersForDate(accountID, projectID uint32, tfss []*TagFilters, date uint64, topN int, deadline uint64) (*TSDBStatus, error) {
is := db.getIndexSearch(accountID, projectID, deadline)
status, err := is.getTSDBStatusWithFiltersForDate(tfss, date, topN, deadline)
db.putIndexSearch(is)
if err != nil {
return nil, err
}
if status.hasEntries() {
return status, nil
}
ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(accountID, projectID, deadline)
status, err = is.getTSDBStatusWithFiltersForDate(tfss, date, topN, deadline)
extDB.putIndexSearch(is)
})
if ok && err != nil {
return nil, fmt.Errorf("error when obtaining TSDB status from extDB: %w", err)
}
return status, nil
}
// getTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss and the given date.
func (is *indexSearch) getTSDBStatusWithFiltersForDate(tfss []*TagFilters, date uint64, topN int, deadline uint64) (*TSDBStatus, error) {
tr := TimeRange{
MinTimestamp: int64(date) * msecPerDay,
MaxTimestamp: int64(date+1) * msecPerDay,
}
metricIDs, err := is.searchMetricIDsInternal(tfss, tr, 2e9)
if err != nil {
return nil, err
}
if metricIDs.Len() == 0 {
// Nothing found.
return &TSDBStatus{}, nil
}
// The code below must be in sync with getTSDBStatusForDate
ts := &is.ts
kb := &is.kb
mp := &is.mp
thLabelValueCountByLabelName := newTopHeap(topN)
thSeriesCountByLabelValuePair := newTopHeap(topN)
thSeriesCountByMetricName := newTopHeap(topN)
var tmp, labelName, labelNameValue []byte
var labelValueCountByLabelName, seriesCountByLabelValuePair uint64
nameEqualBytes := []byte("__name__=")
loopsPaceLimiter := 0
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
prefix := kb.B
ts.Seek(prefix)
for ts.NextItem() {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return nil, err
}
}
loopsPaceLimiter++
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
break
}
if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil {
return nil, err
}
mp.ParseMetricIDs()
matchingSeriesCount := 0
for _, metricID := range mp.MetricIDs {
if metricIDs.Has(metricID) {
matchingSeriesCount++
}
}
if matchingSeriesCount == 0 {
// Skip rows without matching metricIDs.
continue
}
tail := item[len(prefix):]
var err error
tail, tmp, err = unmarshalTagValue(tmp[:0], tail)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal tag key from line %q: %w", item, err)
}
if isArtificialTagKey(tmp) {
// Skip artificially created tag keys.
continue
}
if len(tmp) == 0 {
tmp = append(tmp, "__name__"...)
}
if !bytes.Equal(tmp, labelName) {
thLabelValueCountByLabelName.pushIfNonEmpty(labelName, labelValueCountByLabelName)
labelValueCountByLabelName = 0
labelName = append(labelName[:0], tmp...)
}
tmp = append(tmp, '=')
tail, tmp, err = unmarshalTagValue(tmp, tail)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal tag value from line %q: %w", item, err)
}
if !bytes.Equal(tmp, labelNameValue) {
thSeriesCountByLabelValuePair.pushIfNonEmpty(labelNameValue, seriesCountByLabelValuePair)
if bytes.HasPrefix(labelNameValue, nameEqualBytes) {
thSeriesCountByMetricName.pushIfNonEmpty(labelNameValue[len(nameEqualBytes):], seriesCountByLabelValuePair)
}
seriesCountByLabelValuePair = 0
labelValueCountByLabelName++
labelNameValue = append(labelNameValue[:0], tmp...)
}
if err := mp.InitOnlyTail(item, tail); err != nil {
return nil, err
}
// Take into account deleted timeseries too.
// It is OK if series can be counted multiple times in rare cases -
// the returned number is an estimation.
seriesCountByLabelValuePair += uint64(matchingSeriesCount)
}
if err := ts.Error(); err != nil {
return nil, fmt.Errorf("error when counting time series by metric names: %w", err)
}
thLabelValueCountByLabelName.pushIfNonEmpty(labelName, labelValueCountByLabelName)
thSeriesCountByLabelValuePair.pushIfNonEmpty(labelNameValue, seriesCountByLabelValuePair)
if bytes.HasPrefix(labelNameValue, nameEqualBytes) {
thSeriesCountByMetricName.pushIfNonEmpty(labelNameValue[len(nameEqualBytes):], seriesCountByLabelValuePair)
}
status := &TSDBStatus{
SeriesCountByMetricName: thSeriesCountByMetricName.getSortedResult(),
LabelValueCountByLabelName: thLabelValueCountByLabelName.getSortedResult(),
SeriesCountByLabelValuePair: thSeriesCountByLabelValuePair.getSortedResult(),
}
return status, nil
}
// GetTSDBStatusForDate returns topN entries for tsdb status for the given date, accountID and projectID.
func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int, deadline uint64) (*TSDBStatus, error) {
is := db.getIndexSearch(accountID, projectID, deadline)
@ -1358,6 +1492,7 @@ func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64
}
func (is *indexSearch) getTSDBStatusForDate(date uint64, topN int) (*TSDBStatus, error) {
// The code below must be in sync with getTSDBStatusWithFiltersForDate
ts := &is.ts
kb := &is.kb
mp := &is.mp
@ -2315,21 +2450,9 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer)
}
func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) {
metricIDs := &uint64set.Set{}
for _, tfs := range tfss {
if len(tfs.tfs) == 0 {
// An empty filters must be equivalent to `{__name__!=""}`
tfs = NewTagFilters(tfs.accountID, tfs.projectID)
if err := tfs.Add(nil, nil, true, false); err != nil {
logger.Panicf(`BUG: cannot add {__name__!=""} filter: %s`, err)
}
}
if err := is.updateMetricIDsForTagFilters(metricIDs, tfs, tr, maxMetrics+1); err != nil {
return nil, err
}
if metricIDs.Len() > maxMetrics {
return nil, fmt.Errorf("the number of matching unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics)
}
metricIDs, err := is.searchMetricIDsInternal(tfss, tr, maxMetrics)
if err != nil {
return nil, err
}
if metricIDs.Len() == 0 {
// Nothing found
@ -2353,6 +2476,26 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr
return sortedMetricIDs, nil
}
func (is *indexSearch) searchMetricIDsInternal(tfss []*TagFilters, tr TimeRange, maxMetrics int) (*uint64set.Set, error) {
metricIDs := &uint64set.Set{}
for _, tfs := range tfss {
if len(tfs.tfs) == 0 {
// An empty filters must be equivalent to `{__name__!=""}`
tfs = NewTagFilters(tfs.accountID, tfs.projectID)
if err := tfs.Add(nil, nil, true, false); err != nil {
logger.Panicf(`BUG: cannot add {__name__!=""} filter: %s`, err)
}
}
if err := is.updateMetricIDsForTagFilters(metricIDs, tfs, tr, maxMetrics+1); err != nil {
return nil, err
}
if metricIDs.Len() > maxMetrics {
return nil, fmt.Errorf("the number of matching unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics)
}
}
return metricIDs, nil
}
func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics)
if err == nil {

View file

@ -1771,6 +1771,28 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
if !reflect.DeepEqual(status.SeriesCountByLabelValuePair, expectedSeriesCountByLabelValuePair) {
t.Fatalf("unexpected SeriesCountByLabelValuePair;\ngot\n%v\nwant\n%v", status.SeriesCountByLabelValuePair, expectedSeriesCountByLabelValuePair)
}
// Check GetTSDBStatusWithFiltersForDate
tfs = NewTagFilters(accountID, projectID)
if err := tfs.Add([]byte("day"), []byte("0"), false, false); err != nil {
t.Fatalf("cannot add filter: %s", err)
}
status, err = db.GetTSDBStatusWithFiltersForDate(accountID, projectID, []*TagFilters{tfs}, baseDate, 5, noDeadline)
if err != nil {
t.Fatalf("error in GetTSDBStatusWithFiltersForDate: %s", err)
}
if !status.hasEntries() {
t.Fatalf("expecting non-empty TSDB status")
}
expectedSeriesCountByMetricName = []TopHeapEntry{
{
Name: "testMetric",
Count: 1000,
},
}
if !reflect.DeepEqual(status.SeriesCountByMetricName, expectedSeriesCountByMetricName) {
t.Fatalf("unexpected SeriesCountByMetricName;\ngot\n%v\nwant\n%v", status.SeriesCountByMetricName, expectedSeriesCountByMetricName)
}
}
func toTFPointers(tfs []tagFilter) []*tagFilter {

View file

@ -1324,6 +1324,11 @@ func (s *Storage) GetTSDBStatusForDate(accountID, projectID uint32, date uint64,
return s.idb().GetTSDBStatusForDate(accountID, projectID, date, topN, deadline)
}
// GetTSDBStatusWithFiltersForDate returns TSDB status data for /api/v1/status/tsdb with match[] filters and the given (accountID, projectID).
func (s *Storage) GetTSDBStatusWithFiltersForDate(accountID, projectID uint32, tfss []*TagFilters, date uint64, topN int, deadline uint64) (*TSDBStatus, error) {
return s.idb().GetTSDBStatusWithFiltersForDate(accountID, projectID, tfss, date, topN, deadline)
}
// MetricRow is a metric to insert into storage.
type MetricRow struct {
// MetricNameRaw contains raw metric name, which must be decoded