mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
8adba82c02
This should reduce contention on the channel with unpack work for systems with high number of CPU cores
688 lines
18 KiB
Go
688 lines
18 KiB
Go
package netstorage
|
|
|
|
import (
|
|
"container/heap"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"runtime"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
var (
|
|
maxTagKeysPerSearch = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned per search")
|
|
maxTagValuesPerSearch = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search")
|
|
maxMetricsPerSearch = flag.Int("search.maxUniqueTimeseries", 300e3, "The maximum number of unique time series each search can scan")
|
|
)
|
|
|
|
// Result is a single timeseries result.
|
|
//
|
|
// ProcessSearchQuery returns Result slice.
|
|
type Result struct {
|
|
// The name of the metric.
|
|
MetricName storage.MetricName
|
|
|
|
// Values are sorted by Timestamps.
|
|
Values []float64
|
|
Timestamps []int64
|
|
|
|
// Marshaled MetricName. Used only for results sorting
|
|
// in app/vmselect/promql
|
|
MetricNameMarshaled []byte
|
|
}
|
|
|
|
func (r *Result) reset() {
|
|
r.MetricName.Reset()
|
|
r.Values = r.Values[:0]
|
|
r.Timestamps = r.Timestamps[:0]
|
|
r.MetricNameMarshaled = r.MetricNameMarshaled[:0]
|
|
}
|
|
|
|
// Results holds results returned from ProcessSearchQuery.
|
|
type Results struct {
|
|
tr storage.TimeRange
|
|
fetchData bool
|
|
deadline Deadline
|
|
|
|
packedTimeseries []packedTimeseries
|
|
sr *storage.Search
|
|
}
|
|
|
|
// Len returns the number of results in rss.
|
|
func (rss *Results) Len() int {
|
|
return len(rss.packedTimeseries)
|
|
}
|
|
|
|
// Cancel cancels rss work.
|
|
func (rss *Results) Cancel() {
|
|
rss.mustClose()
|
|
}
|
|
|
|
func (rss *Results) mustClose() {
|
|
putStorageSearch(rss.sr)
|
|
rss.sr = nil
|
|
}
|
|
|
|
var timeseriesWorkCh = make(chan *timeseriesWork, gomaxprocs*16)
|
|
|
|
type timeseriesWork struct {
|
|
rss *Results
|
|
pts *packedTimeseries
|
|
f func(rs *Result, workerID uint)
|
|
doneCh chan error
|
|
|
|
rowsProcessed int
|
|
}
|
|
|
|
func init() {
|
|
for i := 0; i < gomaxprocs; i++ {
|
|
go timeseriesWorker(uint(i))
|
|
}
|
|
}
|
|
|
|
func timeseriesWorker(workerID uint) {
|
|
var rs Result
|
|
var rsLastResetTime uint64
|
|
for tsw := range timeseriesWorkCh {
|
|
rss := tsw.rss
|
|
if rss.deadline.Exceeded() {
|
|
tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
|
|
continue
|
|
}
|
|
if err := tsw.pts.Unpack(&rs, rss.tr, rss.fetchData); err != nil {
|
|
tsw.doneCh <- fmt.Errorf("error during time series unpacking: %w", err)
|
|
continue
|
|
}
|
|
if len(rs.Timestamps) > 0 || !rss.fetchData {
|
|
tsw.f(&rs, workerID)
|
|
}
|
|
tsw.rowsProcessed = len(rs.Values)
|
|
tsw.doneCh <- nil
|
|
currentTime := fasttime.UnixTimestamp()
|
|
if cap(rs.Values) > 1024*1024 && 4*len(rs.Values) < cap(rs.Values) && currentTime-rsLastResetTime > 10 {
|
|
// Reset rs in order to preseve memory usage after processing big time series with millions of rows.
|
|
rs = Result{}
|
|
rsLastResetTime = currentTime
|
|
}
|
|
}
|
|
}
|
|
|
|
// RunParallel runs f in parallel for all the results from rss.
|
|
//
|
|
// f shouldn't hold references to rs after returning.
|
|
// workerID is the id of the worker goroutine that calls f.
|
|
//
|
|
// rss becomes unusable after the call to RunParallel.
|
|
func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
|
|
defer rss.mustClose()
|
|
|
|
// Feed workers with work.
|
|
tsws := make([]*timeseriesWork, len(rss.packedTimeseries))
|
|
for i := range rss.packedTimeseries {
|
|
tsw := ×eriesWork{
|
|
rss: rss,
|
|
pts: &rss.packedTimeseries[i],
|
|
f: f,
|
|
doneCh: make(chan error, 1),
|
|
}
|
|
timeseriesWorkCh <- tsw
|
|
tsws[i] = tsw
|
|
}
|
|
seriesProcessedTotal := len(rss.packedTimeseries)
|
|
rss.packedTimeseries = rss.packedTimeseries[:0]
|
|
|
|
// Wait until work is complete.
|
|
var firstErr error
|
|
rowsProcessedTotal := 0
|
|
for _, tsw := range tsws {
|
|
if err := <-tsw.doneCh; err != nil && firstErr == nil {
|
|
// Return just the first error, since other errors
|
|
// are likely duplicate the first error.
|
|
firstErr = err
|
|
}
|
|
rowsProcessedTotal += tsw.rowsProcessed
|
|
}
|
|
|
|
perQueryRowsProcessed.Update(float64(rowsProcessedTotal))
|
|
perQuerySeriesProcessed.Update(float64(seriesProcessedTotal))
|
|
return firstErr
|
|
}
|
|
|
|
var perQueryRowsProcessed = metrics.NewHistogram(`vm_per_query_rows_processed_count`)
|
|
var perQuerySeriesProcessed = metrics.NewHistogram(`vm_per_query_series_processed_count`)
|
|
|
|
var gomaxprocs = runtime.GOMAXPROCS(-1)
|
|
|
|
type packedTimeseries struct {
|
|
metricName string
|
|
brs []storage.BlockRef
|
|
}
|
|
|
|
var unpackWorkCh = make(chan *unpackWork, gomaxprocs*128)
|
|
|
|
type unpackWorkItem struct {
|
|
br storage.BlockRef
|
|
tr storage.TimeRange
|
|
}
|
|
|
|
type unpackWork struct {
|
|
ws []unpackWorkItem
|
|
fetchData bool
|
|
sbs []*sortBlock
|
|
doneCh chan error
|
|
}
|
|
|
|
func (upw *unpackWork) reset() {
|
|
ws := upw.ws
|
|
for i := range ws {
|
|
w := &ws[i]
|
|
w.br = storage.BlockRef{}
|
|
w.tr = storage.TimeRange{}
|
|
}
|
|
upw.ws = upw.ws[:0]
|
|
upw.fetchData = false
|
|
sbs := upw.sbs
|
|
for i := range sbs {
|
|
sbs[i] = nil
|
|
}
|
|
upw.sbs = upw.sbs[:0]
|
|
if n := len(upw.doneCh); n > 0 {
|
|
logger.Panicf("BUG: upw.doneCh must be empty; it contains %d items now", n)
|
|
}
|
|
}
|
|
|
|
func (upw *unpackWork) unpack() {
|
|
for _, w := range upw.ws {
|
|
sb := getSortBlock()
|
|
if err := sb.unpackFrom(w.br, w.tr, upw.fetchData); err != nil {
|
|
putSortBlock(sb)
|
|
upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err)
|
|
return
|
|
}
|
|
upw.sbs = append(upw.sbs, sb)
|
|
}
|
|
upw.doneCh <- nil
|
|
}
|
|
|
|
func getUnpackWork() *unpackWork {
|
|
v := unpackWorkPool.Get()
|
|
if v != nil {
|
|
return v.(*unpackWork)
|
|
}
|
|
return &unpackWork{
|
|
doneCh: make(chan error, 1),
|
|
}
|
|
}
|
|
|
|
func putUnpackWork(upw *unpackWork) {
|
|
upw.reset()
|
|
unpackWorkPool.Put(upw)
|
|
}
|
|
|
|
var unpackWorkPool sync.Pool
|
|
|
|
func init() {
|
|
for i := 0; i < gomaxprocs; i++ {
|
|
go unpackWorker()
|
|
}
|
|
}
|
|
|
|
func unpackWorker() {
|
|
for upw := range unpackWorkCh {
|
|
upw.unpack()
|
|
}
|
|
}
|
|
|
|
// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine.
|
|
//
|
|
// This batch is needed in order to reduce contention for upackWorkCh in multi-CPU system.
|
|
var unpackBatchSize = 8 * runtime.GOMAXPROCS(-1)
|
|
|
|
// Unpack unpacks pts to dst.
|
|
func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData bool) error {
|
|
dst.reset()
|
|
|
|
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
|
|
return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err)
|
|
}
|
|
|
|
// Feed workers with work
|
|
upws := make([]*unpackWork, 0, 1+len(pts.brs)/unpackBatchSize)
|
|
upw := getUnpackWork()
|
|
upw.fetchData = fetchData
|
|
for _, br := range pts.brs {
|
|
if len(upw.ws) >= unpackBatchSize {
|
|
unpackWorkCh <- upw
|
|
upws = append(upws, upw)
|
|
upw = getUnpackWork()
|
|
upw.fetchData = fetchData
|
|
}
|
|
upw.ws = append(upw.ws, unpackWorkItem{
|
|
br: br,
|
|
tr: tr,
|
|
})
|
|
}
|
|
unpackWorkCh <- upw
|
|
upws = append(upws, upw)
|
|
pts.brs = pts.brs[:0]
|
|
|
|
// Wait until work is complete
|
|
sbs := make([]*sortBlock, 0, len(pts.brs))
|
|
var firstErr error
|
|
for _, upw := range upws {
|
|
if err := <-upw.doneCh; err != nil && firstErr == nil {
|
|
// Return the first error only, since other errors are likely the same.
|
|
firstErr = err
|
|
}
|
|
if firstErr == nil {
|
|
sbs = append(sbs, upw.sbs...)
|
|
} else {
|
|
for _, sb := range upw.sbs {
|
|
putSortBlock(sb)
|
|
}
|
|
}
|
|
putUnpackWork(upw)
|
|
}
|
|
if firstErr != nil {
|
|
return firstErr
|
|
}
|
|
mergeSortBlocks(dst, sbs)
|
|
return nil
|
|
}
|
|
|
|
func getSortBlock() *sortBlock {
|
|
v := sbPool.Get()
|
|
if v == nil {
|
|
return &sortBlock{}
|
|
}
|
|
return v.(*sortBlock)
|
|
}
|
|
|
|
func putSortBlock(sb *sortBlock) {
|
|
sb.reset()
|
|
sbPool.Put(sb)
|
|
}
|
|
|
|
var sbPool sync.Pool
|
|
|
|
var metricRowsSkipped = metrics.NewCounter(`vm_metric_rows_skipped_total{name="vmselect"}`)
|
|
|
|
func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) {
|
|
// Skip empty sort blocks, since they cannot be passed to heap.Init.
|
|
src := sbh
|
|
sbh = sbh[:0]
|
|
for _, sb := range src {
|
|
if len(sb.Timestamps) == 0 {
|
|
putSortBlock(sb)
|
|
continue
|
|
}
|
|
sbh = append(sbh, sb)
|
|
}
|
|
if len(sbh) == 0 {
|
|
return
|
|
}
|
|
heap.Init(&sbh)
|
|
for {
|
|
top := sbh[0]
|
|
heap.Pop(&sbh)
|
|
if len(sbh) == 0 {
|
|
dst.Timestamps = append(dst.Timestamps, top.Timestamps[top.NextIdx:]...)
|
|
dst.Values = append(dst.Values, top.Values[top.NextIdx:]...)
|
|
putSortBlock(top)
|
|
break
|
|
}
|
|
sbNext := sbh[0]
|
|
tsNext := sbNext.Timestamps[sbNext.NextIdx]
|
|
idxNext := len(top.Timestamps)
|
|
if top.Timestamps[idxNext-1] > tsNext {
|
|
idxNext = top.NextIdx
|
|
for top.Timestamps[idxNext] <= tsNext {
|
|
idxNext++
|
|
}
|
|
}
|
|
dst.Timestamps = append(dst.Timestamps, top.Timestamps[top.NextIdx:idxNext]...)
|
|
dst.Values = append(dst.Values, top.Values[top.NextIdx:idxNext]...)
|
|
if idxNext < len(top.Timestamps) {
|
|
top.NextIdx = idxNext
|
|
heap.Push(&sbh, top)
|
|
} else {
|
|
// Return top to the pool.
|
|
putSortBlock(top)
|
|
}
|
|
}
|
|
|
|
timestamps, values := storage.DeduplicateSamples(dst.Timestamps, dst.Values)
|
|
dedups := len(dst.Timestamps) - len(timestamps)
|
|
dedupsDuringSelect.Add(dedups)
|
|
dst.Timestamps = timestamps
|
|
dst.Values = values
|
|
}
|
|
|
|
var dedupsDuringSelect = metrics.NewCounter(`vm_deduplicated_samples_total{type="select"}`)
|
|
|
|
type sortBlock struct {
|
|
// b is used as a temporary storage for unpacked rows before they
|
|
// go to Timestamps and Values.
|
|
b storage.Block
|
|
|
|
Timestamps []int64
|
|
Values []float64
|
|
NextIdx int
|
|
}
|
|
|
|
func (sb *sortBlock) reset() {
|
|
sb.b.Reset()
|
|
sb.Timestamps = sb.Timestamps[:0]
|
|
sb.Values = sb.Values[:0]
|
|
sb.NextIdx = 0
|
|
}
|
|
|
|
func (sb *sortBlock) unpackFrom(br storage.BlockRef, tr storage.TimeRange, fetchData bool) error {
|
|
br.MustReadBlock(&sb.b, fetchData)
|
|
if fetchData {
|
|
if err := sb.b.UnmarshalData(); err != nil {
|
|
return fmt.Errorf("cannot unmarshal block: %w", err)
|
|
}
|
|
}
|
|
timestamps := sb.b.Timestamps()
|
|
|
|
// Skip timestamps smaller than tr.MinTimestamp.
|
|
i := 0
|
|
for i < len(timestamps) && timestamps[i] < tr.MinTimestamp {
|
|
i++
|
|
}
|
|
|
|
// Skip timestamps bigger than tr.MaxTimestamp.
|
|
j := len(timestamps)
|
|
for j > i && timestamps[j-1] > tr.MaxTimestamp {
|
|
j--
|
|
}
|
|
skippedRows := sb.b.RowsCount() - (j - i)
|
|
metricRowsSkipped.Add(skippedRows)
|
|
|
|
// Copy the remaining values.
|
|
if i == j {
|
|
return nil
|
|
}
|
|
values := sb.b.Values()
|
|
sb.Timestamps = append(sb.Timestamps, timestamps[i:j]...)
|
|
sb.Values = decimal.AppendDecimalToFloat(sb.Values, values[i:j], sb.b.Scale())
|
|
return nil
|
|
}
|
|
|
|
type sortBlocksHeap []*sortBlock
|
|
|
|
func (sbh sortBlocksHeap) Len() int {
|
|
return len(sbh)
|
|
}
|
|
|
|
func (sbh sortBlocksHeap) Less(i, j int) bool {
|
|
a := sbh[i]
|
|
b := sbh[j]
|
|
return a.Timestamps[a.NextIdx] < b.Timestamps[b.NextIdx]
|
|
}
|
|
|
|
func (sbh sortBlocksHeap) Swap(i, j int) {
|
|
sbh[i], sbh[j] = sbh[j], sbh[i]
|
|
}
|
|
|
|
func (sbh *sortBlocksHeap) Push(x interface{}) {
|
|
*sbh = append(*sbh, x.(*sortBlock))
|
|
}
|
|
|
|
func (sbh *sortBlocksHeap) Pop() interface{} {
|
|
a := *sbh
|
|
v := a[len(a)-1]
|
|
*sbh = a[:len(a)-1]
|
|
return v
|
|
}
|
|
|
|
// DeleteSeries deletes time series matching the given tagFilterss.
|
|
func DeleteSeries(sq *storage.SearchQuery) (int, error) {
|
|
tfss, err := setupTfss(sq.TagFilterss)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return vmstorage.DeleteMetrics(tfss)
|
|
}
|
|
|
|
// GetLabels returns labels until the given deadline.
|
|
func GetLabels(deadline Deadline) ([]string, error) {
|
|
if deadline.Exceeded() {
|
|
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
labels, err := vmstorage.SearchTagKeys(*maxTagKeysPerSearch, deadline.deadline)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error during labels search: %w", err)
|
|
}
|
|
|
|
// Substitute "" with "__name__"
|
|
for i := range labels {
|
|
if labels[i] == "" {
|
|
labels[i] = "__name__"
|
|
}
|
|
}
|
|
|
|
// Sort labels like Prometheus does
|
|
sort.Strings(labels)
|
|
|
|
return labels, nil
|
|
}
|
|
|
|
// GetLabelValues returns label values for the given labelName
|
|
// until the given deadline.
|
|
func GetLabelValues(labelName string, deadline Deadline) ([]string, error) {
|
|
if deadline.Exceeded() {
|
|
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
if labelName == "__name__" {
|
|
labelName = ""
|
|
}
|
|
|
|
// Search for tag values
|
|
labelValues, err := vmstorage.SearchTagValues([]byte(labelName), *maxTagValuesPerSearch, deadline.deadline)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error during label values search for labelName=%q: %w", labelName, err)
|
|
}
|
|
|
|
// Sort labelValues like Prometheus does
|
|
sort.Strings(labelValues)
|
|
|
|
return labelValues, nil
|
|
}
|
|
|
|
// GetLabelEntries returns all the label entries until the given deadline.
|
|
func GetLabelEntries(deadline Deadline) ([]storage.TagEntry, error) {
|
|
if deadline.Exceeded() {
|
|
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
labelEntries, err := vmstorage.SearchTagEntries(*maxTagKeysPerSearch, *maxTagValuesPerSearch, deadline.deadline)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error during label entries request: %w", err)
|
|
}
|
|
|
|
// Substitute "" with "__name__"
|
|
for i := range labelEntries {
|
|
e := &labelEntries[i]
|
|
if e.Key == "" {
|
|
e.Key = "__name__"
|
|
}
|
|
}
|
|
|
|
// Sort labelEntries by the number of label values in each entry.
|
|
sort.Slice(labelEntries, func(i, j int) bool {
|
|
a, b := labelEntries[i].Values, labelEntries[j].Values
|
|
if len(a) != len(b) {
|
|
return len(a) > len(b)
|
|
}
|
|
return labelEntries[i].Key > labelEntries[j].Key
|
|
})
|
|
|
|
return labelEntries, nil
|
|
}
|
|
|
|
// GetTSDBStatusForDate returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
|
|
func GetTSDBStatusForDate(deadline Deadline, date uint64, topN int) (*storage.TSDBStatus, error) {
|
|
if deadline.Exceeded() {
|
|
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
status, err := vmstorage.GetTSDBStatusForDate(date, topN, deadline.deadline)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error during tsdb status request: %w", err)
|
|
}
|
|
return status, nil
|
|
}
|
|
|
|
// GetSeriesCount returns the number of unique series.
|
|
func GetSeriesCount(deadline Deadline) (uint64, error) {
|
|
if deadline.Exceeded() {
|
|
return 0, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
n, err := vmstorage.GetSeriesCount(deadline.deadline)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("error during series count request: %w", err)
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
func getStorageSearch() *storage.Search {
|
|
v := ssPool.Get()
|
|
if v == nil {
|
|
return &storage.Search{}
|
|
}
|
|
return v.(*storage.Search)
|
|
}
|
|
|
|
func putStorageSearch(sr *storage.Search) {
|
|
sr.MustClose()
|
|
ssPool.Put(sr)
|
|
}
|
|
|
|
var ssPool sync.Pool
|
|
|
|
// ProcessSearchQuery performs sq on storage nodes until the given deadline.
|
|
//
|
|
// Results.RunParallel or Results.Cancel must be called on the returned Results.
|
|
func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, error) {
|
|
if deadline.Exceeded() {
|
|
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
|
}
|
|
|
|
// Setup search.
|
|
tfss, err := setupTfss(sq.TagFilterss)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tr := storage.TimeRange{
|
|
MinTimestamp: sq.MinTimestamp,
|
|
MaxTimestamp: sq.MaxTimestamp,
|
|
}
|
|
if err := vmstorage.CheckTimeRange(tr); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
vmstorage.WG.Add(1)
|
|
defer vmstorage.WG.Done()
|
|
|
|
sr := getStorageSearch()
|
|
maxSeriesCount := sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch, deadline.deadline)
|
|
|
|
m := make(map[string][]storage.BlockRef, maxSeriesCount)
|
|
orderedMetricNames := make([]string, 0, maxSeriesCount)
|
|
blocksRead := 0
|
|
for sr.NextMetricBlock() {
|
|
blocksRead++
|
|
if deadline.Exceeded() {
|
|
return nil, fmt.Errorf("timeout exceeded while fetching data block #%d from storage: %s", blocksRead, deadline.String())
|
|
}
|
|
metricName := sr.MetricBlockRef.MetricName
|
|
brs := m[string(metricName)]
|
|
brs = append(brs, *sr.MetricBlockRef.BlockRef)
|
|
if len(brs) > 1 {
|
|
// An optimization: do not allocate a string for already existing metricName key in m
|
|
m[string(metricName)] = brs
|
|
} else {
|
|
// An optimization for big number of time series with long metricName values:
|
|
// use only a single copy of metricName for both orderedMetricNames and m.
|
|
orderedMetricNames = append(orderedMetricNames, string(metricName))
|
|
m[orderedMetricNames[len(orderedMetricNames)-1]] = brs
|
|
}
|
|
}
|
|
if err := sr.Error(); err != nil {
|
|
if errors.Is(err, storage.ErrDeadlineExceeded) {
|
|
return nil, fmt.Errorf("timeout exceeded during the query: %s", deadline.String())
|
|
}
|
|
return nil, fmt.Errorf("search error after reading %d data blocks: %w", blocksRead, err)
|
|
}
|
|
|
|
var rss Results
|
|
rss.tr = tr
|
|
rss.fetchData = fetchData
|
|
rss.deadline = deadline
|
|
pts := make([]packedTimeseries, len(orderedMetricNames))
|
|
for i, metricName := range orderedMetricNames {
|
|
pts[i] = packedTimeseries{
|
|
metricName: metricName,
|
|
brs: m[metricName],
|
|
}
|
|
}
|
|
rss.packedTimeseries = pts
|
|
rss.sr = sr
|
|
return &rss, nil
|
|
}
|
|
|
|
func setupTfss(tagFilterss [][]storage.TagFilter) ([]*storage.TagFilters, error) {
|
|
tfss := make([]*storage.TagFilters, 0, len(tagFilterss))
|
|
for _, tagFilters := range tagFilterss {
|
|
tfs := storage.NewTagFilters()
|
|
for i := range tagFilters {
|
|
tf := &tagFilters[i]
|
|
if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil {
|
|
return nil, fmt.Errorf("cannot parse tag filter %s: %w", tf, err)
|
|
}
|
|
}
|
|
tfss = append(tfss, tfs)
|
|
tfss = append(tfss, tfs.Finalize()...)
|
|
}
|
|
return tfss, nil
|
|
}
|
|
|
|
// Deadline contains deadline with the corresponding timeout for pretty error messages.
|
|
type Deadline struct {
|
|
deadline uint64
|
|
|
|
timeout time.Duration
|
|
flagHint string
|
|
}
|
|
|
|
// NewDeadline returns deadline for the given timeout.
|
|
//
|
|
// flagHint must contain a hit for command-line flag, which could be used
|
|
// in order to increase timeout.
|
|
func NewDeadline(startTime time.Time, timeout time.Duration, flagHint string) Deadline {
|
|
return Deadline{
|
|
deadline: uint64(startTime.Add(timeout).Unix()),
|
|
timeout: timeout,
|
|
flagHint: flagHint,
|
|
}
|
|
}
|
|
|
|
// Exceeded returns true if deadline is exceeded.
|
|
func (d *Deadline) Exceeded() bool {
|
|
return fasttime.UnixTimestamp() > d.deadline
|
|
}
|
|
|
|
// String returns human-readable string representation for d.
|
|
func (d *Deadline) String() string {
|
|
return fmt.Sprintf("%.3f seconds; the timeout can be adjusted with `%s` command-line flag", d.timeout.Seconds(), d.flagHint)
|
|
}
|