Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2021-02-18 14:53:19 +02:00
commit 3588687f84
17 changed files with 1466 additions and 1307 deletions

View file

@ -0,0 +1,17 @@
{
"name": "graphite-selector",
"issue": "",
"data": [
"graphite-selector.bar.baz 1 {TIME_S-1m}",
"graphite-selector.xxx.yy 2 {TIME_S-1m}",
"graphite-selector.bb.cc 3 {TIME_S-1m}",
"graphite-selector.a.baz 4 {TIME_S-1m}"],
"query": ["/api/v1/query?query=sort({__graphite__='graphite-selector.*.baz'})&time={TIME_S-1m}"],
"result_query": {
"status":"success",
"data":{"resultType":"vector","result":[
{"metric":{"__name__":"graphite-selector.bar.baz"},"value":["{TIME_S-1m}","1"]},
{"metric":{"__name__":"graphite-selector.a.baz"},"value":["{TIME_S-1m}","4"]}
]}
}
}

View file

@ -0,0 +1,8 @@
{
"name": "basic_select_with_extra_labels",
"data": ["[{\"labels\":[{\"name\":\"__name__\",\"value\":\"prometheus.tenant.limits\"},{\"name\":\"baz\",\"value\":\"qux\"},{\"name\":\"tenant\",\"value\":\"dev\"}],\"samples\":[{\"value\":100000,\"timestamp\":\"{TIME_MS}\"}]},{\"labels\":[{\"name\":\"__name__\",\"value\":\"prometheus.up\"},{\"name\":\"baz\",\"value\":\"qux\"}],\"samples\":[{\"value\":100000,\"timestamp\":\"{TIME_MS}\"}]}]"],
"query": ["/api/v1/export?match={__name__!=''}&extra_label=tenant=dev"],
"result_metrics": [
{"metric":{"__name__":"prometheus.tenant.limits","baz":"qux","tenant": "dev"},"values":[100000], "timestamps": ["{TIME_MS}"]}
]
}

View file

@ -171,36 +171,45 @@ func getTLSConfig(argIdx int) (*tls.Config, error) {
func (c *client) runWorker() {
var ok bool
var block []byte
ch := make(chan struct{})
ch := make(chan bool, 1)
for {
block, ok = c.fq.MustReadBlock(block[:0])
if !ok {
return
}
go func() {
c.sendBlock(block)
ch <- struct{}{}
ch <- c.sendBlock(block)
}()
select {
case <-ch:
// The block has been sent successfully
continue
case ok := <-ch:
if ok {
// The block has been sent successfully
continue
}
// Return unsent block to the queue.
c.fq.MustWriteBlock(block)
return
case <-c.stopCh:
// c must be stopped. Wait for a while in the hope the block will be sent.
graceDuration := 5 * time.Second
select {
case <-ch:
// The block has been sent successfully.
case ok := <-ch:
if !ok {
// Return unsent block to the queue.
c.fq.MustWriteBlock(block)
}
case <-time.After(graceDuration):
logger.Errorf("couldn't sent block with size %d bytes to %q in %.3f seconds during shutdown; dropping it",
len(block), c.sanitizedURL, graceDuration.Seconds())
// Return unsent block to the queue.
c.fq.MustWriteBlock(block)
}
return
}
}
}
func (c *client) sendBlock(block []byte) {
// sendBlock returns false only if c.stopCh is closed.
// Otherwise it tries sending the block to remote storage indefinitely.
func (c *client) sendBlock(block []byte) bool {
c.rl.register(len(block), c.stopCh)
retryDuration := time.Second
retriesCount := 0
@ -236,7 +245,7 @@ again:
select {
case <-c.stopCh:
timerpool.Put(t)
return
return false
case <-t.C:
timerpool.Put(t)
}
@ -247,7 +256,7 @@ again:
if statusCode/100 == 2 {
_ = resp.Body.Close()
c.requestsOKCount.Inc()
return
return true
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc()
if statusCode == 409 {
@ -258,7 +267,7 @@ again:
logger.Errorf("unexpected status code received when sending a block with size %d bytes to %q: #%d; dropping the block like Prometheus does; "+
"response body=%q", len(block), c.sanitizedURL, statusCode, body)
c.packetsDropped.Inc()
return
return true
}
// Unexpected status code returned
@ -279,7 +288,7 @@ again:
select {
case <-c.stopCh:
timerpool.Put(t)
return
return false
case <-t.C:
timerpool.Put(t)
}

View file

@ -227,10 +227,10 @@ func (rwctx *remoteWriteCtx) MustStop() {
}
rwctx.idx = 0
rwctx.pss = nil
rwctx.fq.MustClose()
rwctx.fq = nil
rwctx.c.MustStop()
rwctx.c = nil
rwctx.fq.MustClose()
rwctx.fq = nil
rwctx.relabelMetricsDropped = nil
}

View file

@ -452,6 +452,12 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_composite_index_min_timestamp`, func() float64 {
return float64(idbm().MinTimestampForCompositeIndex) / 1e3
})
metrics.NewGauge(`vm_composite_filter_success_conversions_total`, func() float64 {
return float64(idbm().CompositeFilterSuccessConversions)
})
metrics.NewGauge(`vm_composite_filter_missing_conversions_total`, func() float64 {
return float64(idbm().CompositeFilterMissingConversions)
})
metrics.NewGauge(`vm_assisted_merges_total{type="storage/small"}`, func() float64 {
return float64(tm().SmallAssistedMerges)

File diff suppressed because it is too large Load diff

View file

@ -4,7 +4,7 @@ DOCKER_NAMESPACE := victoriametrics
ROOT_IMAGE ?= alpine:3.13.1
CERTS_IMAGE := alpine:3.13.1
GO_BUILDER_IMAGE := golang:1.15.8
GO_BUILDER_IMAGE := golang:1.16.0
BUILDER_IMAGE := local/builder:2.0.0-$(shell echo $(GO_BUILDER_IMAGE) | tr : _)
BASE_IMAGE := local/base:1.1.3-$(shell echo $(ROOT_IMAGE) | tr : _)-$(shell echo $(CERTS_IMAGE) | tr : _)

View file

@ -24,6 +24,7 @@
* [Monitoring Kubernetes with VictoriaMetrics+Prometheus](https://speakerdeck.com/bo0km4n/victoriametrics-plus-prometheusdegou-zhu-surufu-shu-kubernetesfalsejian-shi-ji-pan)
* [High-performance Graphite storage solution on top of VictoriaMetrics](https://golangexample.com/a-high-performance-graphite-storage-solution/)
* [Cloud Native Model Driven Telemetry Stack on OpenShift](https://cer6erus.medium.com/cloud-native-model-driven-telemetry-stack-on-openshift-80712621f5bc)
* [Observability, Availability & DORAs Research Program](https://medium.com/alteos-tech-blog/observability-availability-and-doras-research-program-85deb6680e78)
## Our articles

View file

@ -2,8 +2,12 @@
# tip
# [v1.54.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.54.0)
* FEATURE: optimize searching for matching metrics for `metric{<label_filters>}` queries if `<label_filters>` contains at least a single filter. For example, the query `up{job="foobar"}` should find the matching time series much faster than previously.
* FEATURE: reduce execution times for `q1 <binary_op> q2` queries by executing `q1` and `q2` in parallel.
* FEATURE: switch from Go1.15 to [Go1.16](https://golang.org/doc/go1.16) for building prod binaries.
* FEATURE: single-node VictoriaMetrics now accepts requests to handlers with `/prometheus` and `/graphite` prefixes such as `/prometheus/api/v1/query`. This improves compatibility with [handlers from VictoriaMetrics cluster](https://victoriametrics.github.io/Cluster-VictoriaMetrics.html#url-format).
* FEATURE: expose `process_open_fds` and `process_max_fds` metrics. These metrics can be used for alerting when `process_open_fds` reaches `process_max_fds`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/402 and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1037
* FEATURE: vmalert: add `-datasource.appendTypePrefix` command-line option for querying both Prometheus and Graphite datasource in cluster version of VictoriaMetrics. See [these docs](https://victoriametrics.github.io/vmalert.html#graphite) for details.
@ -11,11 +15,13 @@
* FEATURE: remove dependency on external programs such as `cat`, `grep` and `cut` when detecting cpu and memory limits inside Docker or LXC container.
* FEATURE: vmagent: add `__meta_kubernetes_endpoints_label_*`, `__meta_kubernetes_endpoints_labelpresent_*`, `__meta_kubernetes_endpoints_annotation_*` and `__meta_kubernetes_endpoints_annotationpresent_*` labels for `role: endpoints` in Kubernetes service discovery. These labels where added in Prometheus 2.25.
* FEATURE: reduce the minimum supported retention period for inverted index (aka `indexdb`) from one month to one day. This should reduce disk space usage for `<-storageDataPath>/indexdb` folder if `-retentionPeriod` is set to values smaller than one month.
* FEATURE: vmselect: export per-tenant metrics `vm_vmselect_http_requests_total` and `vm_vmselect_http_requests_duration_ms_total` . Other per-tenant metrics are available as a part of [enterprise package](https://victoriametrics.com/enterprise.html). See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/932 for details.
* BUGFIX: properly convert regexp tag filters containing escaped dots to non-regexp tag filters. For example, `{foo=~"bar\.baz"}` should be converted to `{foo="bar.baz"}`. Previously it was incorrectly converted to `{foo="bar\.baz"}`, which could result in missing time series for this tag filter.
* BUGFIX: do not spam error logs when discovering Docker Swarm targets without dedicated IP. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1028 .
* BUGFIX: properly embed timezone data into VictoriaMetrics apps. This should fix `-loggerTimezone` usage inside Docker containers.
* BUGFIX: properly build Docker images for non-amd64 architectures (arm, arm64, ppc64le, 386) on [Docker hub](https://hub.docker.com/u/victoriametrics/). Previously these images were incorrectly based on amd64 base image, so they didn't work.
* BUGFIX: vmagent: return back unsent block to the queue during graceful shutdown. Previously this block could be dropped if remote storage is unavailable during vmagent shutdown. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1065 .
# [v1.53.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.53.1)

View file

@ -105,13 +105,13 @@ See [Monitoring K8S with VictoriaMetrics](https://docs.google.com/presentation/d
Numbers:
* The number of active time series per VictoriaMetrics instance is 40M.
* The total number of time series per VictoriaMetrics instance is 5000M+.
* Ingestion rate per VictoriaMetrics instance is 1M data points per second.
* The number of active time series per VictoriaMetrics instance is 50 millions.
* The total number of time series per VictoriaMetrics instance is 5000 millions.
* Ingestion rate per VictoriaMetrics instance is 1.1 millions data points per second.
* The total number of datapoints per VictoriaMetrics instance is 8.5 trillions.
* The average time series churn rate is ~80M per day.
* The average query rate is ~100 per second (mostly alert queries).
* Query duration: median is ~20ms, 99th percentile is ~1.5sec.
* The average churn rate is 150 millions new time series per day.
* The average query rate is ~150 per second (mostly alert queries).
* Query duration: median is ~1ms, 99th percentile is ~1sec.
* Retention: 3 months.
> Alternatives that weve played with before choosing VictoriaMetrics are: federated Prometheus, Cortex, IronDB and Thanos.

View file

@ -221,8 +221,8 @@ func (idxbc *indexBlockCache) cleanByTimeout() {
currentTime := fasttime.UnixTimestamp()
idxbc.mu.Lock()
for k, idxbe := range idxbc.m {
// Delete items accessed more than 90 seconds ago.
if currentTime-atomic.LoadUint64(&idxbe.lastAccessTime) > 90 {
// Delete items accessed more than two minutes ago.
if currentTime-atomic.LoadUint64(&idxbe.lastAccessTime) > 2*60 {
// do not call putIndexBlock(ibxbc.m[k]), since it
// may be used by concurrent goroutines.
delete(idxbc.m, k)
@ -375,8 +375,8 @@ func (ibc *inmemoryBlockCache) cleanByTimeout() {
currentTime := fasttime.UnixTimestamp()
ibc.mu.Lock()
for k, ibe := range ibc.m {
// Delete items accessed more than 90 seconds ago.
if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 90 {
// Delete items accessed more than a two minutes ago.
if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 2*60 {
// do not call putInmemoryBlock(ibc.m[k]), since it
// may be used by concurrent goroutines.
delete(ibc.m, k)

View file

@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"io"
"math"
"path/filepath"
"sort"
"sync"
@ -205,7 +204,9 @@ type IndexDBMetrics struct {
IndexBlocksWithMetricIDsProcessed uint64
IndexBlocksWithMetricIDsIncorrectOrder uint64
MinTimestampForCompositeIndex uint64
MinTimestampForCompositeIndex uint64
CompositeFilterSuccessConversions uint64
CompositeFilterMissingConversions uint64
mergeset.TableMetrics
}
@ -249,6 +250,8 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder)
m.MinTimestampForCompositeIndex = uint64(db.minTimestampForCompositeIndex)
m.CompositeFilterSuccessConversions = atomic.LoadUint64(&compositeFilterSuccessConversions)
m.CompositeFilterMissingConversions = atomic.LoadUint64(&compositeFilterMissingConversions)
db.tb.UpdateMetrics(&m.TableMetrics)
db.doExtDB(func(extDB *indexDB) {
@ -2043,7 +2046,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
continue
}
metricIDs, err := is.getMetricIDsForTagFilter(tf, maxMetrics)
metricIDs, _, err := is.getMetricIDsForTagFilter(tf, nil, maxMetrics)
if err != nil {
if err == errFallbackToMetricNameMatch {
// Skip tag filters requiring to scan for too many metrics.
@ -2310,7 +2313,7 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
// Slow path - try searching over the whole inverted index.
// Sort tag filters for faster ts.Seek below.
sort.SliceStable(tfs.tfs, func(i, j int) bool {
sort.Slice(tfs.tfs, func(i, j int) bool {
return tfs.tfs[i].Less(&tfs.tfs[j])
})
minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountOptimized(tfs, tr, maxMetrics)
@ -2365,34 +2368,36 @@ const (
var uselessTagFilterCacheValue = []byte("1")
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (*uint64set.Set, error) {
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set.Set, maxMetrics int) (*uint64set.Set, uint64, error) {
if tf.isNegative {
logger.Panicf("BUG: isNegative must be false")
}
metricIDs := &uint64set.Set{}
if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
if err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs); err != nil {
loopsCount, err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs)
if err != nil {
if err == errFallbackToMetricNameMatch {
return nil, err
return nil, loopsCount, err
}
return nil, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
}
return metricIDs, nil
return metricIDs, loopsCount, nil
}
// Slow path - scan for all the rows with the given prefix.
maxLoops := maxMetrics * maxIndexScanSlowLoopsPerMetric
if err := is.getMetricIDsForTagFilterSlow(tf, nil, maxLoops, metricIDs.Add); err != nil {
maxLoopsCount := uint64(maxMetrics) * maxIndexScanSlowLoopsPerMetric
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoopsCount, metricIDs.Add)
if err != nil {
if err == errFallbackToMetricNameMatch {
return nil, err
return nil, loopsCount, err
}
return nil, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
}
return metricIDs, nil
return metricIDs, loopsCount, nil
}
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, maxLoops int, f func(metricID uint64)) error {
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, maxLoopsCount uint64, f func(metricID uint64)) (uint64, error) {
if len(tf.orSuffixes) > 0 {
logger.Panicf("BUG: the getMetricIDsForTagFilterSlow must be called only for empty tf.orSuffixes; got %s", tf.orSuffixes)
}
@ -2404,40 +2409,40 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
mp.Reset()
var prevMatchingSuffix []byte
var prevMatch bool
loops := 0
var loopsCount uint64
loopsPaceLimiter := 0
prefix := tf.prefix
ts.Seek(prefix)
for ts.NextItem() {
if loopsPaceLimiter&paceLimiterMediumIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
return loopsCount, err
}
}
loopsPaceLimiter++
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
return nil
return loopsCount, nil
}
tail := item[len(prefix):]
n := bytes.IndexByte(tail, tagSeparatorChar)
if n < 0 {
return fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar=%d", item, tagSeparatorChar)
return loopsCount, fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar=%d", item, tagSeparatorChar)
}
suffix := tail[:n+1]
tail = tail[n+1:]
if err := mp.InitOnlyTail(item, tail); err != nil {
return err
return loopsCount, err
}
mp.ParseMetricIDs()
loopsCount += uint64(mp.MetricIDsLen())
if loopsCount > maxLoopsCount {
return loopsCount, errFallbackToMetricNameMatch
}
if prevMatch && string(suffix) == string(prevMatchingSuffix) {
// Fast path: the same tag value found.
// There is no need in checking it again with potentially
// slow tf.matchSuffix, which may call regexp.
loops += mp.MetricIDsLen()
if loops > maxLoops {
return errFallbackToMetricNameMatch
}
for _, metricID := range mp.MetricIDs {
if filter != nil && !filter.Has(metricID) {
continue
@ -2451,11 +2456,11 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
// since the current row has no matching metricIDs.
continue
}
// Slow path: need tf.matchSuffix call.
ok, err := tf.matchSuffix(suffix)
loopsCount += reMatchCost
if err != nil {
return fmt.Errorf("error when matching %s against suffix %q: %w", tf, suffix, err)
return loopsCount, fmt.Errorf("error when matching %s against suffix %q: %w", tf, suffix, err)
}
if !ok {
prevMatch = false
@ -2470,18 +2475,16 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
// The last char in kb.B must be tagSeparatorChar. Just increment it
// in order to jump to the next tag value.
if len(kb.B) == 0 || kb.B[len(kb.B)-1] != tagSeparatorChar || tagSeparatorChar >= 0xff {
return fmt.Errorf("data corruption: the last char in k=%X must be %X", kb.B, tagSeparatorChar)
return loopsCount, fmt.Errorf("data corruption: the last char in k=%X must be %X", kb.B, tagSeparatorChar)
}
kb.B[len(kb.B)-1]++
ts.Seek(kb.B)
// Assume that a seek cost is equivalent to 100 ordinary loops.
loopsCount += 100
continue
}
prevMatch = true
prevMatchingSuffix = append(prevMatchingSuffix[:0], suffix...)
loops += mp.MetricIDsLen()
if loops > maxLoops {
return errFallbackToMetricNameMatch
}
for _, metricID := range mp.MetricIDs {
if filter != nil && !filter.Has(metricID) {
continue
@ -2490,29 +2493,32 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
}
}
if err := ts.Error(); err != nil {
return fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err)
return loopsCount, fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err)
}
return nil
return loopsCount, nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMetrics int, metricIDs *uint64set.Set) error {
func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMetrics int, metricIDs *uint64set.Set) (uint64, error) {
if tf.isNegative {
logger.Panicf("BUG: isNegative must be false")
}
kb := kbPool.Get()
defer kbPool.Put(kb)
var loopsCount uint64
for _, orSuffix := range tf.orSuffixes {
kb.B = append(kb.B[:0], tf.prefix...)
kb.B = append(kb.B, orSuffix...)
kb.B = append(kb.B, tagSeparatorChar)
if err := is.updateMetricIDsForOrSuffixNoFilter(kb.B, maxMetrics, metricIDs); err != nil {
return err
lc, err := is.updateMetricIDsForOrSuffixNoFilter(kb.B, maxMetrics, metricIDs)
if err != nil {
return loopsCount, err
}
loopsCount += lc
if metricIDs.Len() >= maxMetrics {
return nil
return loopsCount, nil
}
}
return nil
return loopsCount, nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set) error {
@ -2530,39 +2536,39 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, met
return nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs *uint64set.Set) error {
func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs *uint64set.Set) (uint64, error) {
ts := &is.ts
mp := &is.mp
mp.Reset()
maxLoops := maxMetrics * maxIndexScanLoopsPerMetric
loops := 0
maxLoopsCount := uint64(maxMetrics) * maxIndexScanLoopsPerMetric
var loopsCount uint64
loopsPaceLimiter := 0
ts.Seek(prefix)
for metricIDs.Len() < maxMetrics && ts.NextItem() {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
return loopsCount, err
}
}
loopsPaceLimiter++
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
return nil
return loopsCount, nil
}
if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil {
return err
return loopsCount, err
}
loops += mp.MetricIDsLen()
if loops > maxLoops {
return errFallbackToMetricNameMatch
loopsCount += uint64(mp.MetricIDsLen())
if loopsCount > maxLoopsCount {
return loopsCount, errFallbackToMetricNameMatch
}
mp.ParseMetricIDs()
metricIDs.AddMulti(mp.MetricIDs)
}
if err := ts.Error(); err != nil {
return fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err)
return loopsCount, fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err)
}
return nil
return loopsCount, nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool) error {
@ -2574,8 +2580,8 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
ts := &is.ts
mp := &is.mp
mp.Reset()
maxLoops := len(sortedFilter) * maxIndexScanLoopsPerMetric
loops := 0
maxLoopsCount := uint64(len(sortedFilter)) * maxIndexScanLoopsPerMetric
var loopsCount uint64
loopsPaceLimiter := 0
ts.Seek(prefix)
var sf []uint64
@ -2606,8 +2612,8 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
return nil
}
sf = sortedFilter
loops += mp.MetricIDsLen()
if loops > maxLoops {
loopsCount += uint64(mp.MetricIDsLen())
if loopsCount > maxLoopsCount {
return errFallbackToMetricNameMatch
}
mp.ParseMetricIDs()
@ -2777,43 +2783,43 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
}
func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) {
// Sort tfs by the duration from previous queries.
// Sort tfs by loopsCount needed for performing each filter.
// This stats is usually collected from the previous queries.
// This way we limit the amount of work below by applying fast filters at first.
type tagFilterWithWeight struct {
tf *tagFilter
durationSeconds float64
loopsCount uint64
lastQueryTimestamp uint64
}
tfws := make([]tagFilterWithWeight, len(tfs.tfs))
ct := fasttime.UnixTimestamp()
currentTime := fasttime.UnixTimestamp()
for i := range tfs.tfs {
tf := &tfs.tfs[i]
durationSeconds, lastQueryTimestamp := is.getDurationAndTimestampForDateFilter(date, tf)
if ct > lastQueryTimestamp+60 {
// It is time to update filter duration stats.
if tf.isNegative || tf.isRegexp && len(tf.orSuffixes) == 0 {
// Negative and regexp filters usually take the most time, so move them to the end of filters
// in the hope they won't be executed at all.
if durationSeconds == 0 {
durationSeconds = 10
}
} else {
// Reset duration stats for relatively fast {key="value"} and {key=~"foo|bar|baz"} filters, so it is re-populated below.
if durationSeconds < 0.5 {
durationSeconds = 0
}
loopsCount, lastQueryTimestamp := is.getLoopsCountAndTimestampForDateFilter(date, tf)
origLoopsCount := loopsCount
if currentTime > lastQueryTimestamp+60*60 {
// Reset loopsCount to 0 every hour for collecting updated stats for the tf.
loopsCount = 0
}
if loopsCount == 0 {
// Prevent from possible thundering herd issue when heavy tf is executed from multiple concurrent queries
// by temporary persisting its position in the tag filters list.
if origLoopsCount == 0 {
origLoopsCount = 10e6
}
lastQueryTimestamp = 0
is.storeLoopsCountForDateFilter(date, tf, origLoopsCount, lastQueryTimestamp)
}
tfws[i] = tagFilterWithWeight{
tf: tf,
durationSeconds: durationSeconds,
loopsCount: loopsCount,
lastQueryTimestamp: lastQueryTimestamp,
}
}
sort.SliceStable(tfws, func(i, j int) bool {
sort.Slice(tfws, func(i, j int) bool {
a, b := &tfws[i], &tfws[j]
if a.durationSeconds != b.durationSeconds {
return a.durationSeconds < b.durationSeconds
if a.loopsCount != b.loopsCount {
return a.loopsCount < b.loopsCount
}
return a.tf.Less(b.tf)
})
@ -2821,8 +2827,8 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
// Populate metricIDs for the first non-negative filter.
var tfsPostponed []*tagFilter
var metricIDs *uint64set.Set
maxDateMetrics := maxMetrics * 50
tfwsRemaining := tfws[:0]
maxDateMetrics := maxMetrics * 50
for i := range tfws {
tfw := tfws[i]
tf := tfw.tf
@ -2830,7 +2836,8 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
tfwsRemaining = append(tfwsRemaining, tfw)
continue
}
m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, tfs.commonPrefix, maxDateMetrics)
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, nil, tfs.commonPrefix, maxDateMetrics)
is.storeLoopsCountForDateFilter(date, tf, loopsCount, tfw.lastQueryTimestamp)
if err != nil {
return nil, err
}
@ -2880,13 +2887,16 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
// Short circuit - there is no need in applying the remaining filters to an empty set.
break
}
if float64(metricIDsLen)/metricNameMatchesPerSecond < tfw.durationSeconds {
if uint64(metricIDsLen)*maxIndexScanLoopsPerMetric < tfw.loopsCount {
// It should be faster performing metricName match on the remaining filters
// instead of scanning big number of entries in the inverted index for these filters.
tfsPostponed = append(tfsPostponed, tf)
// Store stats for non-executed tf, since it could be updated during protection from thundered herd.
is.storeLoopsCountForDateFilter(date, tf, tfw.loopsCount, tfw.lastQueryTimestamp)
continue
}
m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, tfs.commonPrefix, maxDateMetrics)
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, metricIDs, tfs.commonPrefix, maxDateMetrics)
is.storeLoopsCountForDateFilter(date, tf, loopsCount, tfw.lastQueryTimestamp)
if err != nil {
return nil, err
}
@ -2916,12 +2926,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
return metricIDs, nil
}
// The estimated number of per-second loops inside updateMetricIDsByMetricNameMatch
//
// This value is used for determining when matching by metric name must be perfromed instead of matching
// by the remaining tag filters.
const metricNameMatchesPerSecond = 50000
func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
ii := getIndexItems()
defer putIndexItems(ii)
@ -3068,7 +3072,7 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
return true, nil
}
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTimestamp, date uint64, commonPrefix []byte, maxMetrics int) (*uint64set.Set, error) {
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, filter *uint64set.Set, commonPrefix []byte, maxMetrics int) (*uint64set.Set, uint64, error) {
// Augument tag filter prefix for per-date search instead of global search.
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
@ -3080,28 +3084,21 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTime
tfNew := *tf
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
tfNew.prefix = kb.B
startTime := time.Now()
metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics)
metricIDs, loopsCount, err := is.getMetricIDsForTagFilter(&tfNew, filter, maxMetrics)
kbPool.Put(kb)
currentTimestamp := fasttime.UnixTimestamp()
if currentTimestamp > lastQueryTimestamp+5 {
// The cache already contains quite fresh entry for the current (date, tf).
// Do not update it too frequently.
return metricIDs, err
if err != nil {
// Set high loopsCount for failing filter, so it is moved to the end of filter list.
loopsCount = 1e9
}
// Store the duration for tag filter execution in the cache in order to sort tag filters
// in ascending durations on the next search.
durationSeconds := time.Since(startTime).Seconds()
if metricIDs.Len() >= maxMetrics {
// Increase the duration for tag filter matching too many metrics,
// So next time it will be applied after filters matching lower number of metrics.
durationSeconds *= 2
// Increase loopsCount for tag filter matching too many metrics,
// So next time it is moved to the end of filter list.
loopsCount *= 2
}
is.storeDurationAndTimestampForDateFilter(date, tf, durationSeconds, currentTimestamp)
return metricIDs, err
return metricIDs, loopsCount, err
}
func (is *indexSearch) getDurationAndTimestampForDateFilter(date uint64, tf *tagFilter) (float64, uint64) {
func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter) (uint64, uint64) {
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf)
kb := kbPool.Get()
defer kbPool.Put(kb)
@ -3109,18 +3106,22 @@ func (is *indexSearch) getDurationAndTimestampForDateFilter(date uint64, tf *tag
if len(kb.B) != 16 {
return 0, 0
}
n := encoding.UnmarshalUint64(kb.B)
durationSeconds := math.Float64frombits(n)
loopsCount := encoding.UnmarshalUint64(kb.B)
timestamp := encoding.UnmarshalUint64(kb.B[8:])
return durationSeconds, timestamp
return loopsCount, timestamp
}
func (is *indexSearch) storeDurationAndTimestampForDateFilter(date uint64, tf *tagFilter, durationSeconds float64, timestamp uint64) {
func (is *indexSearch) storeLoopsCountForDateFilter(date uint64, tf *tagFilter, loopsCount, prevTimestamp uint64) {
currentTimestamp := fasttime.UnixTimestamp()
if currentTimestamp < prevTimestamp+5 {
// The cache already contains quite fresh entry for the current (date, tf).
// Do not update it too frequently.
return
}
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf)
n := math.Float64bits(durationSeconds)
kb := kbPool.Get()
kb.B = encoding.MarshalUint64(kb.B[:0], n)
kb.B = encoding.MarshalUint64(kb.B, timestamp)
kb.B = encoding.MarshalUint64(kb.B[:0], loopsCount)
kb.B = encoding.MarshalUint64(kb.B, currentTimestamp)
is.db.durationsPerDateTagFilterCache.Set(is.kb.B, kb.B)
kbPool.Put(kb)
}
@ -3246,8 +3247,8 @@ func (is *indexSearch) intersectMetricIDsWithTagFilterNocache(tf *tagFilter, fil
}
// Slow path - scan for all the rows with the given prefix.
maxLoops := filter.Len() * maxIndexScanSlowLoopsPerMetric
err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoops, func(metricID uint64) {
maxLoopsCount := uint64(filter.Len()) * maxIndexScanSlowLoopsPerMetric
_, err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoopsCount, func(metricID uint64) {
if tf.isNegative {
// filter must be equal to metricIDs
metricIDs.Del(metricID)

View file

@ -225,8 +225,8 @@ func (ibc *indexBlockCache) cleanByTimeout() {
currentTime := fasttime.UnixTimestamp()
ibc.mu.Lock()
for k, ibe := range ibc.m {
// Delete items accessed more than 90 seconds ago.
if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 90 {
// Delete items accessed more than two minutes ago.
if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 2*60 {
delete(ibc.m, k)
}
}

View file

@ -588,6 +588,8 @@ func (s *Storage) mustRotateIndexDB() {
}
// MustClose closes the storage.
//
// It is expected that the s is no longer used during the close.
func (s *Storage) MustClose() {
close(s.stop)

View file

@ -967,15 +967,16 @@ func testStorageAddRows(s *Storage) error {
return fmt.Errorf("error when force merging partitions: %w", err)
}
ptws := s1.tb.GetPartitions(nil)
defer s1.tb.PutPartitions(ptws)
for _, ptw := range ptws {
pws := ptw.pt.GetParts(nil)
numParts := len(pws)
ptw.pt.PutParts(pws)
if numParts != 1 {
s1.tb.PutPartitions(ptws)
return fmt.Errorf("unexpected number of parts for partition %q after force merge; got %d; want 1", ptw.pt.name, numParts)
}
}
s1.tb.PutPartitions(ptws)
s1.MustClose()

View file

@ -189,6 +189,7 @@ func (tb *table) addPartitionNolock(pt *partition) {
}
// MustClose closes the table.
// It is expected that all the pending searches on the table are finished before calling MustClose.
func (tb *table) MustClose() {
close(tb.stop)
tb.retentionWatcherWG.Wait()
@ -198,9 +199,10 @@ func (tb *table) MustClose() {
tb.ptws = nil
tb.ptwsLock.Unlock()
// Decrement references to partitions, so they may be eventually closed after
// pending searches are done.
for _, ptw := range ptws {
if n := atomic.LoadUint64(&ptw.refCount); n != 1 {
logger.Panicf("BUG: unexpected refCount=%d when closing the partition; probably there are pending searches", n)
}
ptw.decRef()
}
@ -271,10 +273,10 @@ func (tb *table) AddRows(rows []rawRow) error {
ptwsX.a = tb.GetPartitions(ptwsX.a[:0])
ptws := ptwsX.a
for _, ptw := range ptws {
for i, ptw := range ptws {
singlePt := true
for i := range rows {
if !ptw.pt.HasTimestamp(rows[i].Timestamp) {
for j := range rows {
if !ptw.pt.HasTimestamp(rows[j].Timestamp) {
singlePt = false
break
}
@ -283,16 +285,18 @@ func (tb *table) AddRows(rows []rawRow) error {
continue
}
// Move the partition with the matching rows to the front of tb.ptws,
// so it will be detected faster next time.
tb.ptwsLock.Lock()
for i := range tb.ptws {
if ptw == tb.ptws[i] {
tb.ptws[0], tb.ptws[i] = tb.ptws[i], tb.ptws[0]
break
if i != 0 {
// Move the partition with the matching rows to the front of tb.ptws,
// so it will be detected faster next time.
tb.ptwsLock.Lock()
for j := range tb.ptws {
if ptw == tb.ptws[j] {
tb.ptws[0], tb.ptws[j] = tb.ptws[j], tb.ptws[0]
break
}
}
tb.ptwsLock.Unlock()
}
tb.ptwsLock.Unlock()
// Fast path - add all the rows into the ptw.
ptw.pt.AddRows(rows)

View file

@ -36,6 +36,7 @@ func convertToCompositeTagFilters(tfs *TagFilters) *TagFilters {
}
if len(name) == 0 {
// There is no metric name filter, so composite filters cannot be created.
atomic.AddUint64(&compositeFilterMissingConversions, 1)
return tfs
}
tfsNew := make([]tagFilter, 0, len(tfs.tfs))
@ -61,13 +62,20 @@ func convertToCompositeTagFilters(tfs *TagFilters) *TagFilters {
compositeFilters++
}
if compositeFilters == 0 {
atomic.AddUint64(&compositeFilterMissingConversions, 1)
return tfs
}
tfsCompiled := NewTagFilters()
tfsCompiled.tfs = tfsNew
atomic.AddUint64(&compositeFilterSuccessConversions, 1)
return tfsCompiled
}
var (
compositeFilterSuccessConversions uint64
compositeFilterMissingConversions uint64
)
// TagFilters represents filters used for filtering tags.
type TagFilters struct {
tfs []tagFilter
@ -213,7 +221,9 @@ type tagFilter struct {
value []byte
isNegative bool
isRegexp bool
matchCost uint64
// matchCost is a cost for matching a filter against a single string.
matchCost uint64
// Prefix always contains {nsPrefixTagToMetricIDs, key}.
// Additionally it contains:
@ -237,21 +247,31 @@ type tagFilter struct {
graphiteReverseSuffix []byte
}
func (tf *tagFilter) isComposite() bool {
k := tf.key
return len(k) > 0 && k[0] == compositeTagKeyPrefix
}
func (tf *tagFilter) Less(other *tagFilter) bool {
// Move regexp and negative filters to the end, since they require scanning
// all the entries for the given label.
// Move composite filters to the top, since they usually match lower number of time series.
// Move regexp filters to the bottom, since they require scanning all the entries for the given label.
isCompositeA := tf.isComposite()
isCompositeB := tf.isComposite()
if isCompositeA != isCompositeB {
return isCompositeA
}
if tf.matchCost != other.matchCost {
return tf.matchCost < other.matchCost
}
if tf.isNegative != other.isNegative {
return !tf.isNegative
}
if tf.isRegexp != other.isRegexp {
return !tf.isRegexp
}
if len(tf.orSuffixes) != len(other.orSuffixes) {
return len(tf.orSuffixes) < len(other.orSuffixes)
}
if tf.isNegative != other.isNegative {
return !tf.isNegative
}
return bytes.Compare(tf.prefix, other.prefix) < 0
}
@ -315,9 +335,6 @@ func (tf *tagFilter) InitFromGraphiteQuery(commonPrefix, query []byte, paths []s
tf.prefix = marshalTagValueNoTrailingTagSeparator(tf.prefix, []byte(prefix))
tf.orSuffixes = append(tf.orSuffixes[:0], orSuffixes...)
tf.reSuffixMatch, tf.matchCost = newMatchFuncForOrSuffixes(orSuffixes)
if isNegative {
tf.matchCost *= negativeMatchCostMultiplier
}
}
func getCommonPrefix(ss []string) (string, []string) {
@ -385,9 +402,6 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp
tf.orSuffixes = append(tf.orSuffixes[:0], "")
tf.isEmptyMatch = len(prefix) == 0
tf.matchCost = fullMatchCost
if isNegative {
tf.matchCost *= negativeMatchCostMultiplier
}
return nil
}
rcv, err := getRegexpFromCache(expr)
@ -397,9 +411,6 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp
tf.orSuffixes = append(tf.orSuffixes[:0], rcv.orValues...)
tf.reSuffixMatch = rcv.reMatch
tf.matchCost = rcv.reCost
if isNegative {
tf.matchCost *= negativeMatchCostMultiplier
}
tf.isEmptyMatch = len(prefix) == 0 && tf.reSuffixMatch(nil)
if !tf.isNegative && len(key) == 0 && strings.IndexByte(rcv.literalSuffix, '.') >= 0 {
// Reverse suffix is needed only for non-negative regexp filters on __name__ that contains dots.
@ -571,8 +582,6 @@ const (
reMatchCost = 100
)
const negativeMatchCostMultiplier = 1000
func getOptimizedReMatchFuncExt(reMatch func(b []byte) bool, sre *syntax.Regexp) (func(b []byte) bool, string, uint64) {
if isDotStar(sre) {
// '.*'