mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
app/vmselect: accept focusLabel
query arg at /api/v1/status/tsdb
This commit is contained in:
parent
fb77843639
commit
45fa9d798d
9 changed files with 208 additions and 309 deletions
|
@ -930,14 +930,16 @@ func deduplicateStrings(a []string) []string {
|
|||
return a
|
||||
}
|
||||
|
||||
// GetTSDBStatusForDate returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
|
||||
func GetTSDBStatusForDate(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool,
|
||||
deadline searchutils.Deadline, date uint64, topN, maxMetrics int) (*storage.TSDBStatus, bool, error) {
|
||||
qt = qt.NewChild("get tsdb stats for date=%d, topN=%d", date, topN)
|
||||
// GetTSDBStatus returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
|
||||
//
|
||||
// It accepts aribtrary filters on time series in sq.
|
||||
func GetTSDBStatus(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, sq *storage.SearchQuery, focusLabel string, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, bool, error) {
|
||||
qt = qt.NewChild("get tsdb stats: %s, focusLabel=%q, topN=%d", sq, focusLabel, topN)
|
||||
defer qt.Done()
|
||||
if deadline.Exceeded() {
|
||||
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||
}
|
||||
requestData := sq.Marshal(nil)
|
||||
// Send the query to all the storage nodes in parallel.
|
||||
type nodeResult struct {
|
||||
status *storage.TSDBStatus
|
||||
|
@ -945,7 +947,7 @@ func GetTSDBStatusForDate(qt *querytracer.Tracer, at *auth.Token, denyPartialRes
|
|||
}
|
||||
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
|
||||
sn.tsdbStatusRequests.Inc()
|
||||
status, err := sn.getTSDBStatusForDate(qt, at.AccountID, at.ProjectID, date, topN, maxMetrics, deadline)
|
||||
status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline)
|
||||
if err != nil {
|
||||
sn.tsdbStatusErrors.Inc()
|
||||
err = fmt.Errorf("cannot obtain tsdb status from vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||
|
@ -1026,53 +1028,6 @@ func toTopHeapEntries(m map[string]uint64, topN int) []storage.TopHeapEntry {
|
|||
return a
|
||||
}
|
||||
|
||||
// GetTSDBStatusWithFilters returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
|
||||
//
|
||||
// It accepts aribtrary filters on time series in sq.
|
||||
func GetTSDBStatusWithFilters(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool,
|
||||
deadline searchutils.Deadline, sq *storage.SearchQuery, topN int) (*storage.TSDBStatus, bool, error) {
|
||||
qt = qt.NewChild("get tsdb stats: %s, topN=%d", sq, topN)
|
||||
defer qt.Done()
|
||||
if deadline.Exceeded() {
|
||||
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||
}
|
||||
requestData := sq.Marshal(nil)
|
||||
// Send the query to all the storage nodes in parallel.
|
||||
type nodeResult struct {
|
||||
status *storage.TSDBStatus
|
||||
err error
|
||||
}
|
||||
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
|
||||
sn.tsdbStatusWithFiltersRequests.Inc()
|
||||
status, err := sn.getTSDBStatusWithFilters(qt, requestData, topN, deadline)
|
||||
if err != nil {
|
||||
sn.tsdbStatusWithFiltersErrors.Inc()
|
||||
err = fmt.Errorf("cannot obtain tsdb status with filters from vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||
}
|
||||
return &nodeResult{
|
||||
status: status,
|
||||
err: err,
|
||||
}
|
||||
})
|
||||
|
||||
// Collect results.
|
||||
var statuses []*storage.TSDBStatus
|
||||
isPartial, err := snr.collectResults(partialTSDBStatusResults, func(result interface{}) error {
|
||||
nr := result.(*nodeResult)
|
||||
if nr.err != nil {
|
||||
return nr.err
|
||||
}
|
||||
statuses = append(statuses, nr.status)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, isPartial, fmt.Errorf("cannot fetch tsdb status with filters from vmstorage nodes: %w", err)
|
||||
}
|
||||
|
||||
status := mergeTSDBStatuses(statuses, topN)
|
||||
return status, isPartial, nil
|
||||
}
|
||||
|
||||
// GetSeriesCount returns the number of unique series for the given at.
|
||||
func GetSeriesCount(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, deadline searchutils.Deadline) (uint64, bool, error) {
|
||||
qt = qt.NewChild("get series count")
|
||||
|
@ -1499,12 +1454,6 @@ type storageNode struct {
|
|||
// The number of errors during requests to tsdb status.
|
||||
tsdbStatusErrors *metrics.Counter
|
||||
|
||||
// The number of requests to tsdb status.
|
||||
tsdbStatusWithFiltersRequests *metrics.Counter
|
||||
|
||||
// The number of errors during requests to tsdb status.
|
||||
tsdbStatusWithFiltersErrors *metrics.Counter
|
||||
|
||||
// The number of requests to seriesCount.
|
||||
seriesCountRequests *metrics.Counter
|
||||
|
||||
|
@ -1605,34 +1554,17 @@ func (sn *storageNode) getTagValueSuffixes(qt *querytracer.Tracer, accountID, pr
|
|||
return suffixes, nil
|
||||
}
|
||||
|
||||
func (sn *storageNode) getTSDBStatusForDate(qt *querytracer.Tracer, accountID, projectID uint32,
|
||||
date uint64, topN, maxMetrics int, deadline searchutils.Deadline) (*storage.TSDBStatus, error) {
|
||||
func (sn *storageNode) getTSDBStatus(qt *querytracer.Tracer, requestData []byte, focusLabel string, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, error) {
|
||||
var status *storage.TSDBStatus
|
||||
f := func(bc *handshake.BufferedConn) error {
|
||||
st, err := sn.getTSDBStatusForDateOnConn(bc, accountID, projectID, date, topN, maxMetrics)
|
||||
st, err := sn.getTSDBStatusOnConn(bc, requestData, focusLabel, topN)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
status = st
|
||||
return nil
|
||||
}
|
||||
if err := sn.execOnConnWithPossibleRetry(qt, "tsdbStatus_v4", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return status, nil
|
||||
}
|
||||
|
||||
func (sn *storageNode) getTSDBStatusWithFilters(qt *querytracer.Tracer, requestData []byte, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, error) {
|
||||
var status *storage.TSDBStatus
|
||||
f := func(bc *handshake.BufferedConn) error {
|
||||
st, err := sn.getTSDBStatusWithFiltersOnConn(bc, requestData, topN)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
status = st
|
||||
return nil
|
||||
}
|
||||
if err := sn.execOnConnWithPossibleRetry(qt, "tsdbStatusWithFilters_v3", f, deadline); err != nil {
|
||||
if err := sn.execOnConnWithPossibleRetry(qt, "tsdbStatus_v5", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return status, nil
|
||||
|
@ -2007,51 +1939,20 @@ func (sn *storageNode) getTagValueSuffixesOnConn(bc *handshake.BufferedConn, acc
|
|||
return suffixes, nil
|
||||
}
|
||||
|
||||
func (sn *storageNode) getTSDBStatusForDateOnConn(bc *handshake.BufferedConn, accountID, projectID uint32, date uint64, topN, maxMetrics int) (*storage.TSDBStatus, error) {
|
||||
// Send the request to sn.
|
||||
if err := sendAccountIDProjectID(bc, accountID, projectID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// date shouldn't exceed 32 bits, so send it as uint32.
|
||||
if err := writeUint32(bc, uint32(date)); err != nil {
|
||||
return nil, fmt.Errorf("cannot send date=%d to conn: %w", date, err)
|
||||
}
|
||||
// topN shouldn't exceed 32 bits, so send it as uint32.
|
||||
if err := writeUint32(bc, uint32(topN)); err != nil {
|
||||
return nil, fmt.Errorf("cannot send topN=%d to conn: %w", topN, err)
|
||||
}
|
||||
// maxMetrics shouldn't exceed 32 bits, so send it as uint32.
|
||||
if err := writeUint32(bc, uint32(maxMetrics)); err != nil {
|
||||
return nil, fmt.Errorf("cannot send maxMetrics=%d to conn: %w", maxMetrics, err)
|
||||
}
|
||||
if err := bc.Flush(); err != nil {
|
||||
return nil, fmt.Errorf("cannot flush tsdbStatus args to conn: %w", err)
|
||||
}
|
||||
|
||||
// Read response error.
|
||||
buf, err := readBytes(nil, bc, maxErrorMessageSize)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot read error message: %w", err)
|
||||
}
|
||||
if len(buf) > 0 {
|
||||
return nil, newErrRemote(buf)
|
||||
}
|
||||
|
||||
// Read response
|
||||
return readTSDBStatus(bc)
|
||||
}
|
||||
|
||||
func (sn *storageNode) getTSDBStatusWithFiltersOnConn(bc *handshake.BufferedConn, requestData []byte, topN int) (*storage.TSDBStatus, error) {
|
||||
func (sn *storageNode) getTSDBStatusOnConn(bc *handshake.BufferedConn, requestData []byte, focusLabel string, topN int) (*storage.TSDBStatus, error) {
|
||||
// Send the request to sn.
|
||||
if err := writeBytes(bc, requestData); err != nil {
|
||||
return nil, fmt.Errorf("cannot write requestData: %w", err)
|
||||
}
|
||||
if err := writeBytes(bc, []byte(focusLabel)); err != nil {
|
||||
return nil, fmt.Errorf("cannot write focusLabel=%q: %w", focusLabel, err)
|
||||
}
|
||||
// topN shouldn't exceed 32 bits, so send it as uint32.
|
||||
if err := writeUint32(bc, uint32(topN)); err != nil {
|
||||
return nil, fmt.Errorf("cannot send topN=%d to conn: %w", topN, err)
|
||||
}
|
||||
if err := bc.Flush(); err != nil {
|
||||
return nil, fmt.Errorf("cannot flush tsdbStatusWithFilters args to conn: %w", err)
|
||||
return nil, fmt.Errorf("cannot flush tsdbStatus args to conn: %w", err)
|
||||
}
|
||||
|
||||
// Read response error.
|
||||
|
@ -2358,28 +2259,26 @@ func InitStorageNodes(addrs []string) {
|
|||
|
||||
concurrentQueries: metrics.NewCounter(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr)),
|
||||
|
||||
registerMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
registerMetricNamesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
deleteSeriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
deleteSeriesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
labelNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
labelNamesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
labelValuesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
labelValuesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
tagValueSuffixesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
tagValueSuffixesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
tsdbStatusRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
tsdbStatusErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
tsdbStatusWithFiltersRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatusWithFilters", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
tsdbStatusWithFiltersErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatusWithFilters", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
seriesCountErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
searchMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
searchRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
searchMetricNamesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
searchErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)),
|
||||
metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)),
|
||||
registerMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
registerMetricNamesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
deleteSeriesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
deleteSeriesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
labelNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
labelNamesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
labelValuesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
labelValuesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
tagValueSuffixesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
tagValueSuffixesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
tsdbStatusRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
tsdbStatusErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
seriesCountErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
searchMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
searchRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
searchMetricNamesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
searchErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
|
||||
metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)),
|
||||
metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)),
|
||||
}
|
||||
storageNodes = append(storageNodes, sn)
|
||||
}
|
||||
|
|
|
@ -550,12 +550,17 @@ func TSDBStatusHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok
|
|||
date := fasttime.UnixDate()
|
||||
dateStr := r.FormValue("date")
|
||||
if len(dateStr) > 0 {
|
||||
t, err := time.Parse("2006-01-02", dateStr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse `date` arg %q: %w", dateStr, err)
|
||||
if dateStr == "0" {
|
||||
date = 0
|
||||
} else {
|
||||
t, err := time.Parse("2006-01-02", dateStr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse `date` arg %q: %w", dateStr, err)
|
||||
}
|
||||
date = uint64(t.Unix()) / secsPerDay
|
||||
}
|
||||
date = uint64(t.Unix()) / secsPerDay
|
||||
}
|
||||
focusLabel := r.FormValue("focusLabel")
|
||||
topN := 10
|
||||
topNStr := r.FormValue("topN")
|
||||
if len(topNStr) > 0 {
|
||||
|
@ -572,18 +577,12 @@ func TSDBStatusHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok
|
|||
topN = n
|
||||
}
|
||||
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
|
||||
var status *storage.TSDBStatus
|
||||
var isPartial bool
|
||||
if len(cp.filterss) == 0 {
|
||||
status, isPartial, err = netstorage.GetTSDBStatusForDate(qt, at, denyPartialResponse, cp.deadline, date, topN, *maxTSDBStatusSeries)
|
||||
if err != nil {
|
||||
return fmt.Errorf(`cannot obtain tsdb status for date=%d, topN=%d: %w`, date, topN, err)
|
||||
}
|
||||
} else {
|
||||
status, isPartial, err = tsdbStatusWithMatches(qt, at, denyPartialResponse, cp.filterss, date, topN, *maxTSDBStatusSeries, cp.deadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot obtain tsdb status with matches for date=%d, topN=%d: %w", date, topN, err)
|
||||
}
|
||||
start := int64(date*secsPerDay) * 1000
|
||||
end := int64((date+1)*secsPerDay)*1000 - 1
|
||||
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, cp.filterss, *maxTSDBStatusSeries)
|
||||
status, isPartial, err := netstorage.GetTSDBStatus(qt, at, denyPartialResponse, sq, focusLabel, topN, cp.deadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot obtain tsdb stats: %w", err)
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
@ -596,17 +595,6 @@ func TSDBStatusHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok
|
|||
return nil
|
||||
}
|
||||
|
||||
func tsdbStatusWithMatches(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, filterss [][]storage.TagFilter, date uint64, topN, maxMetrics int, deadline searchutils.Deadline) (*storage.TSDBStatus, bool, error) {
|
||||
start := int64(date*secsPerDay) * 1000
|
||||
end := int64(date*secsPerDay+secsPerDay) * 1000
|
||||
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, filterss, maxMetrics)
|
||||
status, isPartial, err := netstorage.GetTSDBStatusWithFilters(qt, at, denyPartialResponse, deadline, sq, topN)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
return status, isPartial, nil
|
||||
}
|
||||
|
||||
var tsdbStatusDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/tsdb"}`)
|
||||
|
||||
// LabelsHandler processes /api/v1/labels request.
|
||||
|
|
|
@ -14,6 +14,7 @@ TSDBStatusResponse generates response for /api/v1/status/tsdb .
|
|||
"totalLabelValuePairs": {%dul= status.TotalLabelValuePairs %},
|
||||
"seriesCountByMetricName":{%= tsdbStatusEntries(status.SeriesCountByMetricName) %},
|
||||
"seriesCountByLabelName":{%= tsdbStatusEntries(status.SeriesCountByLabelName) %},
|
||||
"seriesCountByFocusLabelValue":{%= tsdbStatusEntries(status.SeriesCountByFocusLabelValue) %},
|
||||
"seriesCountByLabelValuePair":{%= tsdbStatusEntries(status.SeriesCountByLabelValuePair) %},
|
||||
"labelValueCountByLabelName":{%= tsdbStatusEntries(status.LabelValueCountByLabelName) %}
|
||||
}
|
||||
|
|
|
@ -56,102 +56,106 @@ func StreamTSDBStatusResponse(qw422016 *qt422016.Writer, isPartial bool, status
|
|||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
||||
streamtsdbStatusEntries(qw422016, status.SeriesCountByLabelName)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
||||
qw422016.N().S(`,"seriesCountByFocusLabelValue":`)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:17
|
||||
streamtsdbStatusEntries(qw422016, status.SeriesCountByFocusLabelValue)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:17
|
||||
qw422016.N().S(`,"seriesCountByLabelValuePair":`)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:17
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:18
|
||||
streamtsdbStatusEntries(qw422016, status.SeriesCountByLabelValuePair)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:17
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:18
|
||||
qw422016.N().S(`,"labelValueCountByLabelName":`)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:18
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:19
|
||||
streamtsdbStatusEntries(qw422016, status.LabelValueCountByLabelName)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:18
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:19
|
||||
qw422016.N().S(`}`)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:20
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
|
||||
qt.Done()
|
||||
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:22
|
||||
streamdumpQueryTrace(qw422016, qt)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:22
|
||||
qw422016.N().S(`}`)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:24
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:24
|
||||
func WriteTSDBStatusResponse(qq422016 qtio422016.Writer, isPartial bool, status *storage.TSDBStatus, qt *querytracer.Tracer) {
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:24
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:24
|
||||
StreamTSDBStatusResponse(qw422016, isPartial, status, qt)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:24
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:24
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:24
|
||||
func TSDBStatusResponse(isPartial bool, status *storage.TSDBStatus, qt *querytracer.Tracer) string {
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:24
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:24
|
||||
WriteTSDBStatusResponse(qb422016, isPartial, status, qt)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:24
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:24
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:24
|
||||
return qs422016
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:24
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:25
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
|
||||
func streamtsdbStatusEntries(qw422016 *qt422016.Writer, a []storage.TopHeapEntry) {
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:25
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
|
||||
qw422016.N().S(`[`)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:27
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
||||
for i, e := range a {
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:27
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
||||
qw422016.N().S(`{"name":`)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:29
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:30
|
||||
qw422016.N().Q(e.Name)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:29
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:30
|
||||
qw422016.N().S(`,"value":`)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:30
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:31
|
||||
qw422016.N().D(int(e.Count))
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:30
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:31
|
||||
qw422016.N().S(`}`)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:32
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||
if i+1 < len(a) {
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:32
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:32
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||
}
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:34
|
||||
}
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:34
|
||||
qw422016.N().S(`]`)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:35
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:36
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:35
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:36
|
||||
func writetsdbStatusEntries(qq422016 qtio422016.Writer, a []storage.TopHeapEntry) {
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:35
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:36
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:35
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:36
|
||||
streamtsdbStatusEntries(qw422016, a)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:35
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:36
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:35
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:36
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:35
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:36
|
||||
func tsdbStatusEntries(a []storage.TopHeapEntry) string {
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:35
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:36
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:35
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:36
|
||||
writetsdbStatusEntries(qb422016, a)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:35
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:36
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:35
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:36
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:35
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:36
|
||||
return qs422016
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:35
|
||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:36
|
||||
}
|
||||
|
|
|
@ -539,10 +539,8 @@ func (s *Server) processVMSelectRPC(ctx *vmselectRequestCtx, rpcName string) err
|
|||
return s.processVMSelectLabelNames(ctx)
|
||||
case "seriesCount_v4":
|
||||
return s.processVMSelectSeriesCount(ctx)
|
||||
case "tsdbStatus_v4":
|
||||
case "tsdbStatus_v5":
|
||||
return s.processVMSelectTSDBStatus(ctx)
|
||||
case "tsdbStatusWithFilters_v3":
|
||||
return s.processVMSelectTSDBStatusWithFilters(ctx)
|
||||
case "deleteMetrics_v5":
|
||||
return s.processVMSelectDeleteMetrics(ctx)
|
||||
case "registerMetricNames_v3":
|
||||
|
@ -827,29 +825,29 @@ func (s *Server) processVMSelectTSDBStatus(ctx *vmselectRequestCtx) error {
|
|||
vmselectTSDBStatusRequests.Inc()
|
||||
|
||||
// Read request
|
||||
accountID, projectID, err := ctx.readAccountIDProjectID()
|
||||
if err != nil {
|
||||
if err := ctx.readSearchQuery(); err != nil {
|
||||
return err
|
||||
}
|
||||
date, err := ctx.readUint32()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read date: %w", err)
|
||||
if err := ctx.readDataBufBytes(maxLabelValueSize); err != nil {
|
||||
return fmt.Errorf("cannot read focusLabel: %w", err)
|
||||
}
|
||||
focusLabel := string(ctx.dataBuf)
|
||||
topN, err := ctx.readUint32()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read topN: %w", err)
|
||||
}
|
||||
maxMetricsUint32, err := ctx.readUint32()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read MaxMetrics: %w", err)
|
||||
}
|
||||
maxMetrics := int(maxMetricsUint32)
|
||||
if maxMetrics < 0 {
|
||||
return fmt.Errorf("too big value for MaxMetrics=%d; must be smaller than 2e9", maxMetricsUint32)
|
||||
}
|
||||
|
||||
// Execute the request
|
||||
status, err := s.storage.GetTSDBStatusWithFiltersForDate(ctx.qt, accountID, projectID, nil, uint64(date), int(topN), maxMetrics, ctx.deadline)
|
||||
tr := storage.TimeRange{
|
||||
MinTimestamp: ctx.sq.MinTimestamp,
|
||||
MaxTimestamp: ctx.sq.MaxTimestamp,
|
||||
}
|
||||
if err := ctx.setupTfss(s.storage, tr); err != nil {
|
||||
return ctx.writeErrorMessage(err)
|
||||
}
|
||||
maxMetrics := ctx.getMaxMetrics()
|
||||
date := uint64(ctx.sq.MinTimestamp) / (24 * 3600 * 1000)
|
||||
status, err := s.storage.GetTSDBStatus(ctx.qt, ctx.sq.AccountID, ctx.sq.ProjectID, ctx.tfss, date, focusLabel, int(topN), maxMetrics, ctx.deadline)
|
||||
if err != nil {
|
||||
return ctx.writeErrorMessage(err)
|
||||
}
|
||||
|
@ -882,42 +880,6 @@ func writeTSDBStatus(ctx *vmselectRequestCtx, status *storage.TSDBStatus) error
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) processVMSelectTSDBStatusWithFilters(ctx *vmselectRequestCtx) error {
|
||||
vmselectTSDBStatusWithFiltersRequests.Inc()
|
||||
|
||||
// Read request
|
||||
if err := ctx.readSearchQuery(); err != nil {
|
||||
return err
|
||||
}
|
||||
topN, err := ctx.readUint32()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read topN: %w", err)
|
||||
}
|
||||
|
||||
// Execute the request
|
||||
tr := storage.TimeRange{
|
||||
MinTimestamp: ctx.sq.MinTimestamp,
|
||||
MaxTimestamp: ctx.sq.MaxTimestamp,
|
||||
}
|
||||
if err := ctx.setupTfss(s.storage, tr); err != nil {
|
||||
return ctx.writeErrorMessage(err)
|
||||
}
|
||||
maxMetrics := ctx.getMaxMetrics()
|
||||
date := uint64(ctx.sq.MinTimestamp) / (24 * 3600 * 1000)
|
||||
status, err := s.storage.GetTSDBStatusWithFiltersForDate(ctx.qt, ctx.sq.AccountID, ctx.sq.ProjectID, ctx.tfss, date, int(topN), maxMetrics, ctx.deadline)
|
||||
if err != nil {
|
||||
return ctx.writeErrorMessage(err)
|
||||
}
|
||||
|
||||
// Send an empty error message to vmselect.
|
||||
if err := ctx.writeString(""); err != nil {
|
||||
return fmt.Errorf("cannot send empty error message: %w", err)
|
||||
}
|
||||
|
||||
// Send status to vmselect.
|
||||
return writeTSDBStatus(ctx, status)
|
||||
}
|
||||
|
||||
func writeTopHeapEntries(ctx *vmselectRequestCtx, a []storage.TopHeapEntry) error {
|
||||
if err := ctx.writeUint64(uint64(len(a))); err != nil {
|
||||
return fmt.Errorf("cannot write topHeapEntries size: %w", err)
|
||||
|
@ -1062,16 +1024,15 @@ func checkTimeRange(s *storage.Storage, tr storage.TimeRange) error {
|
|||
}
|
||||
|
||||
var (
|
||||
vmselectRegisterMetricNamesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="register_metric_names"}`)
|
||||
vmselectDeleteMetricsRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="delete_metrics"}`)
|
||||
vmselectLabelNamesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="label_names"}`)
|
||||
vmselectLabelValuesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="label_values"}`)
|
||||
vmselectTagValueSuffixesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="tag_value_suffixes"}`)
|
||||
vmselectSeriesCountRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="series_count"}`)
|
||||
vmselectTSDBStatusRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="tsdb_status"}`)
|
||||
vmselectTSDBStatusWithFiltersRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="tsdb_status_with_filters"}`)
|
||||
vmselectSearchMetricNamesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="search_metric_names"}`)
|
||||
vmselectSearchRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="search"}`)
|
||||
vmselectRegisterMetricNamesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="register_metric_names"}`)
|
||||
vmselectDeleteMetricsRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="delete_metrics"}`)
|
||||
vmselectLabelNamesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="label_names"}`)
|
||||
vmselectLabelValuesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="label_values"}`)
|
||||
vmselectTagValueSuffixesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="tag_value_suffixes"}`)
|
||||
vmselectSeriesCountRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="series_count"}`)
|
||||
vmselectTSDBStatusRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="tsdb_status"}`)
|
||||
vmselectSearchMetricNamesRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="search_metric_names"}`)
|
||||
vmselectSearchRequests = metrics.NewCounter(`vm_vmselect_rpc_requests_total{name="search"}`)
|
||||
|
||||
vmselectMetricBlocksRead = metrics.NewCounter(`vm_vmselect_metric_blocks_read_total`)
|
||||
vmselectMetricRowsRead = metrics.NewCounter(`vm_vmselect_metric_rows_read_total`)
|
||||
|
|
|
@ -272,7 +272,8 @@ See the [example VMUI at VictoriaMetrics playground](https://play.victoriametric
|
|||
VictoriaMetrics provides an ability to explore time series cardinality at `cardinality` tab in [vmui](#vmui) in the following ways:
|
||||
|
||||
- To identify metric names with the highest number of series.
|
||||
- To idnetify labels with the highest number of series.
|
||||
- To identify labels with the highest number of series.
|
||||
- To identify values with the highest number of series for the selected label (aka `focusLabel`).
|
||||
- To identify label=name pairs with the highest number of series.
|
||||
- To identify labels with the highest number of unique values.
|
||||
|
||||
|
@ -1445,6 +1446,7 @@ VictoriaMetrics returns TSDB stats at `/api/v1/status/tsdb` page in the way simi
|
|||
|
||||
* `topN=N` where `N` is the number of top entries to return in the response. By default top 10 entries are returned.
|
||||
* `date=YYYY-MM-DD` where `YYYY-MM-DD` is the date for collecting the stats. By default the stats is collected for the current day. Pass `date=1970-01-01` in order to collect global stats across all the days.
|
||||
* `focusLabel=LABEL_NAME` returns label values with the highest number of time series for the given `LABEL_NAME` in the `seriesCountByFocusLabelValue` list.
|
||||
* `match[]=SELECTOR` where `SELECTOR` is an arbitrary [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) for series to take into account during stats calculation. By default all the series are taken into account.
|
||||
* `extra_label=LABEL=VALUE`. See [these docs](#prometheus-querying-api-enhancements) for more details.
|
||||
|
||||
|
|
|
@ -1263,11 +1263,11 @@ func (is *indexSearch) getSeriesCount() (uint64, error) {
|
|||
return metricIDsLen, nil
|
||||
}
|
||||
|
||||
// GetTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss, date, accountID and projectID.
|
||||
func (db *indexDB) GetTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*TagFilters, date uint64, topN, maxMetrics int, deadline uint64) (*TSDBStatus, error) {
|
||||
// GetTSDBStatus returns topN entries for tsdb status for the given tfss, date and focusLabel.
|
||||
func (db *indexDB) GetTSDBStatus(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*TagFilters, date uint64, focusLabel string, topN, maxMetrics int, deadline uint64) (*TSDBStatus, error) {
|
||||
qtChild := qt.NewChild("collect tsdb stats in the current indexdb")
|
||||
is := db.getIndexSearch(accountID, projectID, deadline)
|
||||
status, err := is.getTSDBStatusWithFiltersForDate(qtChild, tfss, date, topN, maxMetrics)
|
||||
status, err := is.getTSDBStatus(qtChild, tfss, date, focusLabel, topN, maxMetrics)
|
||||
qtChild.Done()
|
||||
db.putIndexSearch(is)
|
||||
if err != nil {
|
||||
|
@ -1279,7 +1279,7 @@ func (db *indexDB) GetTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, accou
|
|||
ok := db.doExtDB(func(extDB *indexDB) {
|
||||
qtChild := qt.NewChild("collect tsdb stats in the previous indexdb")
|
||||
is := extDB.getIndexSearch(accountID, projectID, deadline)
|
||||
status, err = is.getTSDBStatusWithFiltersForDate(qtChild, tfss, date, topN, maxMetrics)
|
||||
status, err = is.getTSDBStatus(qtChild, tfss, date, focusLabel, topN, maxMetrics)
|
||||
qtChild.Done()
|
||||
extDB.putIndexSearch(is)
|
||||
})
|
||||
|
@ -1289,8 +1289,8 @@ func (db *indexDB) GetTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, accou
|
|||
return status, nil
|
||||
}
|
||||
|
||||
// getTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss and the given date.
|
||||
func (is *indexSearch) getTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, tfss []*TagFilters, date uint64, topN, maxMetrics int) (*TSDBStatus, error) {
|
||||
// getTSDBStatus returns topN entries for tsdb status for the given tfss, date and focusLabel.
|
||||
func (is *indexSearch) getTSDBStatus(qt *querytracer.Tracer, tfss []*TagFilters, date uint64, focusLabel string, topN, maxMetrics int) (*TSDBStatus, error) {
|
||||
filter, err := is.searchMetricIDsWithFiltersOnDate(qt, tfss, date, maxMetrics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1305,12 +1305,14 @@ func (is *indexSearch) getTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, t
|
|||
dmis := is.db.s.getDeletedMetricIDs()
|
||||
thSeriesCountByMetricName := newTopHeap(topN)
|
||||
thSeriesCountByLabelName := newTopHeap(topN)
|
||||
thSeriesCountByFocusLabelValue := newTopHeap(topN)
|
||||
thSeriesCountByLabelValuePair := newTopHeap(topN)
|
||||
thLabelValueCountByLabelName := newTopHeap(topN)
|
||||
var tmp, prevLabelName, prevLabelValuePair []byte
|
||||
var labelValueCountByLabelName, seriesCountByLabelValuePair uint64
|
||||
var totalSeries, labelSeries, totalLabelValuePairs uint64
|
||||
nameEqualBytes := []byte("__name__=")
|
||||
focusLabelEqualBytes := []byte(focusLabel + "=")
|
||||
|
||||
loopsPaceLimiter := 0
|
||||
nsPrefixExpected := byte(nsPrefixDateTagToMetricIDs)
|
||||
|
@ -1382,6 +1384,9 @@ func (is *indexSearch) getTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, t
|
|||
if bytes.HasPrefix(prevLabelValuePair, nameEqualBytes) {
|
||||
thSeriesCountByMetricName.push(prevLabelValuePair[len(nameEqualBytes):], seriesCountByLabelValuePair)
|
||||
}
|
||||
if bytes.HasPrefix(prevLabelValuePair, focusLabelEqualBytes) {
|
||||
thSeriesCountByFocusLabelValue.push(prevLabelValuePair[len(focusLabelEqualBytes):], seriesCountByLabelValuePair)
|
||||
}
|
||||
seriesCountByLabelValuePair = 0
|
||||
labelValueCountByLabelName++
|
||||
prevLabelValuePair = append(prevLabelValuePair[:0], labelValuePair...)
|
||||
|
@ -1401,13 +1406,17 @@ func (is *indexSearch) getTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, t
|
|||
if bytes.HasPrefix(prevLabelValuePair, nameEqualBytes) {
|
||||
thSeriesCountByMetricName.push(prevLabelValuePair[len(nameEqualBytes):], seriesCountByLabelValuePair)
|
||||
}
|
||||
if bytes.HasPrefix(prevLabelValuePair, focusLabelEqualBytes) {
|
||||
thSeriesCountByFocusLabelValue.push(prevLabelValuePair[len(focusLabelEqualBytes):], seriesCountByLabelValuePair)
|
||||
}
|
||||
status := &TSDBStatus{
|
||||
TotalSeries: totalSeries,
|
||||
TotalLabelValuePairs: totalLabelValuePairs,
|
||||
SeriesCountByMetricName: thSeriesCountByMetricName.getSortedResult(),
|
||||
SeriesCountByLabelName: thSeriesCountByLabelName.getSortedResult(),
|
||||
SeriesCountByLabelValuePair: thSeriesCountByLabelValuePair.getSortedResult(),
|
||||
LabelValueCountByLabelName: thLabelValueCountByLabelName.getSortedResult(),
|
||||
TotalSeries: totalSeries,
|
||||
TotalLabelValuePairs: totalLabelValuePairs,
|
||||
SeriesCountByMetricName: thSeriesCountByMetricName.getSortedResult(),
|
||||
SeriesCountByLabelName: thSeriesCountByLabelName.getSortedResult(),
|
||||
SeriesCountByFocusLabelValue: thSeriesCountByFocusLabelValue.getSortedResult(),
|
||||
SeriesCountByLabelValuePair: thSeriesCountByLabelValuePair.getSortedResult(),
|
||||
LabelValueCountByLabelName: thLabelValueCountByLabelName.getSortedResult(),
|
||||
}
|
||||
return status, nil
|
||||
}
|
||||
|
@ -1416,12 +1425,13 @@ func (is *indexSearch) getTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, t
|
|||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
|
||||
type TSDBStatus struct {
|
||||
TotalSeries uint64
|
||||
TotalLabelValuePairs uint64
|
||||
SeriesCountByMetricName []TopHeapEntry
|
||||
SeriesCountByLabelName []TopHeapEntry
|
||||
SeriesCountByLabelValuePair []TopHeapEntry
|
||||
LabelValueCountByLabelName []TopHeapEntry
|
||||
TotalSeries uint64
|
||||
TotalLabelValuePairs uint64
|
||||
SeriesCountByMetricName []TopHeapEntry
|
||||
SeriesCountByLabelName []TopHeapEntry
|
||||
SeriesCountByFocusLabelValue []TopHeapEntry
|
||||
SeriesCountByLabelValuePair []TopHeapEntry
|
||||
LabelValueCountByLabelName []TopHeapEntry
|
||||
}
|
||||
|
||||
func (status *TSDBStatus) hasEntries() bool {
|
||||
|
|
|
@ -1887,10 +1887,10 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
t.Fatalf("expected %d time series for all days, got %d time series", metricsPerDay*days, len(matchedTSIDs))
|
||||
}
|
||||
|
||||
// Check GetTSDBStatusWithFiltersForDate with nil filters.
|
||||
status, err := db.GetTSDBStatusWithFiltersForDate(nil, accountID, projectID, nil, baseDate, 5, 1e6, noDeadline)
|
||||
// Check GetTSDBStatus with nil filters.
|
||||
status, err := db.GetTSDBStatus(nil, accountID, projectID, nil, baseDate, "day", 5, 1e6, noDeadline)
|
||||
if err != nil {
|
||||
t.Fatalf("error in GetTSDBStatusWithFiltersForDate with nil filters: %s", err)
|
||||
t.Fatalf("error in GetTSDBStatus with nil filters: %s", err)
|
||||
}
|
||||
if !status.hasEntries() {
|
||||
t.Fatalf("expecting non-empty TSDB status")
|
||||
|
@ -1925,6 +1925,15 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
if !reflect.DeepEqual(status.SeriesCountByLabelName, expectedSeriesCountByLabelName) {
|
||||
t.Fatalf("unexpected SeriesCountByLabelName;\ngot\n%v\nwant\n%v", status.SeriesCountByLabelName, expectedSeriesCountByLabelName)
|
||||
}
|
||||
expectedSeriesCountByFocusLabelValue := []TopHeapEntry{
|
||||
{
|
||||
Name: "0",
|
||||
Count: 1000,
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(status.SeriesCountByFocusLabelValue, expectedSeriesCountByFocusLabelValue) {
|
||||
t.Fatalf("unexpected SeriesCountByFocusLabelValue;\ngot\n%v\nwant\n%v", status.SeriesCountByFocusLabelValue, expectedSeriesCountByFocusLabelValue)
|
||||
}
|
||||
expectedLabelValueCountByLabelName := []TopHeapEntry{
|
||||
{
|
||||
Name: "uniqueid",
|
||||
|
@ -1980,14 +1989,14 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
t.Fatalf("unexpected TotalLabelValuePairs; got %d; want %d", status.TotalLabelValuePairs, expectedLabelValuePairs)
|
||||
}
|
||||
|
||||
// Check GetTSDBStatusWithFiltersForDate with non-nil filter, which matches all the series
|
||||
// Check GetTSDBStatus with non-nil filter, which matches all the series
|
||||
tfs = NewTagFilters(accountID, projectID)
|
||||
if err := tfs.Add([]byte("day"), []byte("0"), false, false); err != nil {
|
||||
t.Fatalf("cannot add filter: %s", err)
|
||||
}
|
||||
status, err = db.GetTSDBStatusWithFiltersForDate(nil, accountID, projectID, []*TagFilters{tfs}, baseDate, 5, 1e6, noDeadline)
|
||||
status, err = db.GetTSDBStatus(nil, accountID, projectID, []*TagFilters{tfs}, baseDate, "", 5, 1e6, noDeadline)
|
||||
if err != nil {
|
||||
t.Fatalf("error in GetTSDBStatusWithFiltersForDate: %s", err)
|
||||
t.Fatalf("error in GetTSDBStatus: %s", err)
|
||||
}
|
||||
if !status.hasEntries() {
|
||||
t.Fatalf("expecting non-empty TSDB status")
|
||||
|
@ -2010,10 +2019,10 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
t.Fatalf("unexpected TotalLabelValuePairs; got %d; want %d", status.TotalLabelValuePairs, expectedLabelValuePairs)
|
||||
}
|
||||
|
||||
// Check GetTSDBStatusWithFiltersOnDate with non-nil filter, which matches all the series on a global time range
|
||||
status, err = db.GetTSDBStatusWithFiltersForDate(nil, accountID, projectID, nil, 0, 5, 1e6, noDeadline)
|
||||
// Check GetTSDBStatus with non-nil filter, which matches all the series on a global time range
|
||||
status, err = db.GetTSDBStatus(nil, accountID, projectID, nil, 0, "day", 5, 1e6, noDeadline)
|
||||
if err != nil {
|
||||
t.Fatalf("error in GetTSDBStatusWithFiltersForDate: %s", err)
|
||||
t.Fatalf("error in GetTSDBStatus: %s", err)
|
||||
}
|
||||
if !status.hasEntries() {
|
||||
t.Fatalf("expecting non-empty TSDB status")
|
||||
|
@ -2035,15 +2044,40 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
if status.TotalLabelValuePairs != expectedLabelValuePairs {
|
||||
t.Fatalf("unexpected TotalLabelValuePairs; got %d; want %d", status.TotalLabelValuePairs, expectedLabelValuePairs)
|
||||
}
|
||||
expectedSeriesCountByFocusLabelValue = []TopHeapEntry{
|
||||
{
|
||||
Name: "0",
|
||||
Count: 1000,
|
||||
},
|
||||
{
|
||||
Name: "1",
|
||||
Count: 1000,
|
||||
},
|
||||
{
|
||||
Name: "2",
|
||||
Count: 1000,
|
||||
},
|
||||
{
|
||||
Name: "3",
|
||||
Count: 1000,
|
||||
},
|
||||
{
|
||||
Name: "4",
|
||||
Count: 1000,
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(status.SeriesCountByFocusLabelValue, expectedSeriesCountByFocusLabelValue) {
|
||||
t.Fatalf("unexpected SeriesCountByFocusLabelValue;\ngot\n%v\nwant\n%v", status.SeriesCountByFocusLabelValue, expectedSeriesCountByFocusLabelValue)
|
||||
}
|
||||
|
||||
// Check GetTSDBStatusWithFiltersForDate with non-nil filter, which matches only 3 series
|
||||
// Check GetTSDBStatus with non-nil filter, which matches only 3 series
|
||||
tfs = NewTagFilters(accountID, projectID)
|
||||
if err := tfs.Add([]byte("uniqueid"), []byte("0|1|3"), false, true); err != nil {
|
||||
t.Fatalf("cannot add filter: %s", err)
|
||||
}
|
||||
status, err = db.GetTSDBStatusWithFiltersForDate(nil, accountID, projectID, []*TagFilters{tfs}, baseDate, 5, 1e6, noDeadline)
|
||||
status, err = db.GetTSDBStatus(nil, accountID, projectID, []*TagFilters{tfs}, baseDate, "", 5, 1e6, noDeadline)
|
||||
if err != nil {
|
||||
t.Fatalf("error in GetTSDBStatusWithFiltersForDate: %s", err)
|
||||
t.Fatalf("error in GetTSDBStatus: %s", err)
|
||||
}
|
||||
if !status.hasEntries() {
|
||||
t.Fatalf("expecting non-empty TSDB status")
|
||||
|
@ -2066,10 +2100,10 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
t.Fatalf("unexpected TotalLabelValuePairs; got %d; want %d", status.TotalLabelValuePairs, expectedLabelValuePairs)
|
||||
}
|
||||
|
||||
// Check GetTSDBStatusWithFiltersForDate with non-nil filter on global time range, which matches only 15 series
|
||||
status, err = db.GetTSDBStatusWithFiltersForDate(nil, accountID, projectID, []*TagFilters{tfs}, 0, 5, 1e6, noDeadline)
|
||||
// Check GetTSDBStatus with non-nil filter on global time range, which matches only 15 series
|
||||
status, err = db.GetTSDBStatus(nil, accountID, projectID, []*TagFilters{tfs}, 0, "", 5, 1e6, noDeadline)
|
||||
if err != nil {
|
||||
t.Fatalf("error in GetTSDBStatusWithFiltersForDate: %s", err)
|
||||
t.Fatalf("error in GetTSDBStatus: %s", err)
|
||||
}
|
||||
if !status.hasEntries() {
|
||||
t.Fatalf("expecting non-empty TSDB status")
|
||||
|
|
|
@ -1541,9 +1541,9 @@ func (s *Storage) GetSeriesCount(accountID, projectID uint32, deadline uint64) (
|
|||
return s.idb().GetSeriesCount(accountID, projectID, deadline)
|
||||
}
|
||||
|
||||
// GetTSDBStatusWithFiltersForDate returns TSDB status data for /api/v1/status/tsdb with match[] filters and the given (accountID, projectID).
|
||||
func (s *Storage) GetTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*TagFilters, date uint64, topN, maxMetrics int, deadline uint64) (*TSDBStatus, error) {
|
||||
return s.idb().GetTSDBStatusWithFiltersForDate(qt, accountID, projectID, tfss, date, topN, maxMetrics, deadline)
|
||||
// GetTSDBStatus returns TSDB status data for /api/v1/status/tsdb
|
||||
func (s *Storage) GetTSDBStatus(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*TagFilters, date uint64, focusLabel string, topN, maxMetrics int, deadline uint64) (*TSDBStatus, error) {
|
||||
return s.idb().GetTSDBStatus(qt, accountID, projectID, tfss, date, focusLabel, topN, maxMetrics, deadline)
|
||||
}
|
||||
|
||||
// MetricRow is a metric to insert into storage.
|
||||
|
|
Loading…
Reference in a new issue