Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2021-02-27 00:25:01 +02:00
commit edb2ab7d8e
4 changed files with 21 additions and 7 deletions

View file

@ -178,7 +178,7 @@ func (rrc *rollupResultCache) Get(ec *EvalConfig, expr metricsql.Expr, window in
bb := bbPool.Get() bb := bbPool.Get()
defer bbPool.Put(bb) defer bbPool.Put(bb)
bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step) bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step, ec.EnforcedTagFilters)
metainfoBuf := rrc.c.Get(nil, bb.B) metainfoBuf := rrc.c.Get(nil, bb.B)
if len(metainfoBuf) == 0 { if len(metainfoBuf) == 0 {
return nil, ec.Start return nil, ec.Start
@ -198,7 +198,7 @@ func (rrc *rollupResultCache) Get(ec *EvalConfig, expr metricsql.Expr, window in
if len(compressedResultBuf.B) == 0 { if len(compressedResultBuf.B) == 0 {
mi.RemoveKey(key) mi.RemoveKey(key)
metainfoBuf = mi.Marshal(metainfoBuf[:0]) metainfoBuf = mi.Marshal(metainfoBuf[:0])
bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step) bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step, ec.EnforcedTagFilters)
rrc.c.Set(bb.B, metainfoBuf) rrc.c.Set(bb.B, metainfoBuf)
return nil, ec.Start return nil, ec.Start
} }
@ -301,7 +301,7 @@ func (rrc *rollupResultCache) Put(ec *EvalConfig, expr metricsql.Expr, window in
bb.B = key.Marshal(bb.B[:0]) bb.B = key.Marshal(bb.B[:0])
rrc.c.SetBig(bb.B, compressedResultBuf.B) rrc.c.SetBig(bb.B, compressedResultBuf.B)
bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step) bb.B = marshalRollupResultCacheKey(bb.B[:0], expr, window, ec.Step, ec.EnforcedTagFilters)
metainfoBuf := rrc.c.Get(nil, bb.B) metainfoBuf := rrc.c.Get(nil, bb.B)
var mi rollupResultCacheMetainfo var mi rollupResultCacheMetainfo
if len(metainfoBuf) > 0 { if len(metainfoBuf) > 0 {
@ -331,11 +331,14 @@ var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total")
// Increment this value every time the format of the cache changes. // Increment this value every time the format of the cache changes.
const rollupResultCacheVersion = 7 const rollupResultCacheVersion = 7
func marshalRollupResultCacheKey(dst []byte, expr metricsql.Expr, window, step int64) []byte { func marshalRollupResultCacheKey(dst []byte, expr metricsql.Expr, window, step int64, filters []storage.TagFilter) []byte {
dst = append(dst, rollupResultCacheVersion) dst = append(dst, rollupResultCacheVersion)
dst = encoding.MarshalInt64(dst, window) dst = encoding.MarshalInt64(dst, window)
dst = encoding.MarshalInt64(dst, step) dst = encoding.MarshalInt64(dst, step)
dst = expr.AppendString(dst) dst = expr.AppendString(dst)
for _, f := range filters {
dst = f.Marshal(dst)
}
return dst return dst
} }

View file

@ -17,11 +17,12 @@
* FEATURE: add `increase_pure(m[d])` function to MetricsQL. It works the same as `increase(m[d])` except of various edge cases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/962) for details. * FEATURE: add `increase_pure(m[d])` function to MetricsQL. It works the same as `increase(m[d])` except of various edge cases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/962) for details.
* FEATURE: increase accuracy for `buckets_limit(limit, buckets)` results for small `limit` values. See [MetricsQL docs](https://victoriametrics.github.io/MetricsQL.html) for details. * FEATURE: increase accuracy for `buckets_limit(limit, buckets)` results for small `limit` values. See [MetricsQL docs](https://victoriametrics.github.io/MetricsQL.html) for details.
* BUGFIX: vmagent: properly perform graceful shutdown on `SIGINT` and `SIGTERM` signals. The graceful shutdown has been broken in `v1.54.0`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1065 * BUGFIX: vmagent: properly perform graceful shutdown on `SIGINT` and `SIGTERM` signals. The graceful shutdown has been broken in `v1.54.0`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1065
* BUGFIX: reduce the probability of `duplicate time series` errors when querying Kubernetes metrics. * BUGFIX: reduce the probability of `duplicate time series` errors when querying Kubernetes metrics.
* BUGFIX: properly calculate `histogram_quantile()` over time series with only a single non-zero bucket with `{le="+Inf"}`. Previously `NaN` was returned, now the value for the last bucket before `{le="+Inf"}` is returned like Prometheus does. * BUGFIX: properly calculate `histogram_quantile()` over time series with only a single non-zero bucket with `{le="+Inf"}`. Previously `NaN` was returned, now the value for the last bucket before `{le="+Inf"}` is returned like Prometheus does.
* BUGFIX: vmselect: do not cache partial query results on timeout when receiving data from `vmstorage` nodes. See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1085 * BUGFIX: vmselect: do not cache partial query results on timeout when receiving data from `vmstorage` nodes. See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1085
* BUGFIX: properly handle `stale NFS file handle` error.
* BUGFIX: properly cache query results when `extra_label` query arg is used. Previously the cached results could clash for different `extra_label` values. See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1095
# [v1.54.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.54.1) # [v1.54.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.54.1)

View file

@ -13,7 +13,7 @@ import (
func mustRemoveAll(path string, done func()) bool { func mustRemoveAll(path string, done func()) bool {
err := os.RemoveAll(path) err := os.RemoveAll(path)
if err == nil { if err == nil || isStaleNFSFileHandleError(err) {
// Make sure the parent directory doesn't contain references // Make sure the parent directory doesn't contain references
// to the current directory. // to the current directory.
mustSyncParentDirIfExists(path) mustSyncParentDirIfExists(path)
@ -87,6 +87,11 @@ func dirRemover() {
} }
} }
func isStaleNFSFileHandleError(err error) bool {
errStr := err.Error()
return strings.Contains(errStr, "stale NFS file handle")
}
func isTemporaryNFSError(err error) bool { func isTemporaryNFSError(err error) bool {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 for details. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 for details.
errStr := err.Error() errStr := err.Error()

View file

@ -563,8 +563,13 @@ func (swc *scrapeWorkCache) Get(key string) *ScrapeWork {
currentTime := fasttime.UnixTimestamp() currentTime := fasttime.UnixTimestamp()
swc.mu.Lock() swc.mu.Lock()
swe := swc.m[key] swe := swc.m[key]
swe.lastAccessTime = currentTime if swe != nil {
swe.lastAccessTime = currentTime
}
swc.mu.Unlock() swc.mu.Unlock()
if swe == nil {
return nil
}
return swe.sw return swe.sw
} }