Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2022-10-21 01:11:40 +03:00
commit 22e87b0088
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
7 changed files with 171 additions and 45 deletions

View file

@ -39,7 +39,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow controlling staleness tracking on a per-[scrape_config](https://docs.victoriametrics.com/sd_configs.html#scrape_configs) basis by specifying `no_stale_markers: true` or `no_stale_markers: false` option in the corresponding [scrape_config](https://docs.victoriametrics.com/sd_configs.html#scrape_configs).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): limit the number of plotted series. This should prevent from browser crashes or hangs when the query returns big number of time series. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3155).
* FEATURE: log error if some environment variables referred at `-promscrape.config` via `%{ENV_VAR}` aren't found. This should prevent from using incorrect config files.
* FEATURE: log error if some environment variables referred at `-promscrape.config` via `%{ENV_VAR}` aren't found. This should prevent from silent using incorrect config files.
* FEATURE: immediately shut down VictoriaMetrics apps on the second SIGINT or SIGTERM signal if they couldn't be finished gracefully for some reason after receiving the first signal.
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly merge buckets with identical `le` values, but with different string representation of these values when calculating [histogram_quantile](https://docs.victoriametrics.com/MetricsQL.html#histogram_quantile) and [histogram_share](https://docs.victoriametrics.com/MetricsQL.html#histogram_share). For example, `http_request_duration_seconds_bucket{le="5"}` and `http_requests_duration_seconds_bucket{le="5.0"}`. Such buckets may be returned from distinct targets. Thanks to @647-coder for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3225).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): change severity level for log messages about failed attempts for sending data to remote storage from `error` to `warn`. The message for about all failed send attempts remains at `error` severity level.

View file

@ -42,6 +42,20 @@ const (
MarshalTypeNearestDelta = MarshalType(6)
)
// NeedsValidation returns true if mt may need additional validation for silent data corruption.
func (mt MarshalType) NeedsValidation() bool {
switch mt {
case MarshalTypeNearestDelta2,
MarshalTypeNearestDelta:
return true
default:
// Other types do not need additional validation,
// since they either already contain checksums (e.g. compressed data)
// or they are trivial and cannot be validated (e.g. const or delta const)
return false
}
}
// CheckMarshalType verifies whether the mt is valid.
func CheckMarshalType(mt MarshalType) error {
if mt < 0 || mt > 6 {

View file

@ -10,6 +10,7 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
@ -158,10 +159,20 @@ func (riss *rawItemsShards) Len() int {
return n
}
type rawItemsShard struct {
mu sync.Mutex
ibs []*inmemoryBlock
type rawItemsShardNopad struct {
// Put lastFlushTime to the top in order to avoid unaligned memory access on 32-bit architectures
lastFlushTime uint64
mu sync.Mutex
ibs []*inmemoryBlock
}
type rawItemsShard struct {
rawItemsShardNopad
// The padding prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(rawItemsShardNopad{})%128]byte
}
func (ris *rawItemsShard) Len() int {

View file

@ -26,6 +26,10 @@ func WaitForSigterm() os.Signal {
// Prevent from the program stop on SIGHUP
continue
}
// Stop listening for SIGINT and SIGTERM signals,
// so the app could be interrupted by sending these signals again
// in the case if the caller doesn't finish the app gracefully.
signal.Stop(ch)
return sig
}
}

View file

@ -272,7 +272,7 @@ func (b *Block) UnmarshalData() error {
if b.bh.PrecisionBits < 64 {
// Recover timestamps order after lossy compression.
encoding.EnsureNonDecreasingSequence(b.timestamps, b.bh.MinTimestamp, b.bh.MaxTimestamp)
} else {
} else if b.bh.TimestampsMarshalType.NeedsValidation() {
// Ensure timestamps are in the range [MinTimestamp ... MaxTimestamps] and are ordered.
if err := checkTimestampsBounds(b.timestamps, b.bh.MinTimestamp, b.bh.MaxTimestamp); err != nil {
return err

View file

@ -445,10 +445,20 @@ func (rrss *rawRowsShards) Len() int {
return n
}
type rawRowsShard struct {
mu sync.Mutex
rows []rawRow
type rawRowsShardNopad struct {
// Put lastFlushTime to the top in order to avoid unaligned memory access on 32-bit architectures
lastFlushTime uint64
mu sync.Mutex
rows []rawRow
}
type rawRowsShard struct {
rawRowsShardNopad
// The padding prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(rawRowsShardNopad{})%128]byte
}
func (rrs *rawRowsShard) Len() int {
@ -469,14 +479,24 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {
maxRowsCount := cap(rrs.rows)
capacity := maxRowsCount - len(rrs.rows)
if capacity >= len(rows) {
// Fast path - rows fit capacity.
// Fast path - rows fit rrs.rows capacity.
rrs.rows = append(rrs.rows, rows...)
} else {
// Slow path - rows don't fit capacity.
// Put rrs.rows and rows to rowsToFlush and convert it to a part.
rowsToFlush = append(rowsToFlush, rrs.rows...)
rowsToFlush = append(rowsToFlush, rows...)
rrs.rows = rrs.rows[:0]
// Slow path - rows don't fit rrs.rows capacity.
// Fill rrs.rows with rows until capacity,
// then put rrs.rows to rowsToFlush and convert it to a part.
n := copy(rrs.rows[:cap(rrs.rows)], rows)
rows = rows[n:]
rowsToFlush = rrs.rows
n = getMaxRawRowsPerShard()
rrs.rows = make([]rawRow, 0, n)
if len(rows) <= n {
rrs.rows = append(rrs.rows[:0], rows...)
} else {
// The slowest path - rows do not fit rrs.rows capacity.
// So append them directly to rowsToFlush.
rowsToFlush = append(rowsToFlush, rows...)
}
atomic.StoreUint64(&rrs.lastFlushTime, fasttime.UnixTimestamp())
}
rrs.mu.Unlock()

View file

@ -17,7 +17,7 @@ const (
whole = 2
)
const defaultExpireDuration = 10 * time.Minute
const defaultExpireDuration = 20 * time.Minute
// Cache is a cache for working set entries.
//
@ -27,8 +27,8 @@ type Cache struct {
curr atomic.Value
prev atomic.Value
// cs holds cache stats
cs fastcache.Stats
// csHistory holds cache stats history
csHistory fastcache.Stats
// mode indicates whether to use only curr and skip prev.
//
@ -42,7 +42,7 @@ type Cache struct {
maxBytes int
// mu serializes access to curr, prev and mode
// in expirationWatcher and cacheSizeWatcher.
// in expirationWatcher, prevCacheWatcher and cacheSizeWatcher.
mu sync.Mutex
wg sync.WaitGroup
@ -122,6 +122,11 @@ func (c *Cache) runWatchers(expireDuration time.Duration) {
c.expirationWatcher(expireDuration)
}()
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.prevCacheWatcher()
}()
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.cacheSizeWatcher()
@ -129,15 +134,15 @@ func (c *Cache) runWatchers(expireDuration time.Duration) {
}
func (c *Cache) expirationWatcher(expireDuration time.Duration) {
t := time.NewTicker(expireDuration / 2)
expireDuration += timeJitter(expireDuration / 10)
t := time.NewTicker(expireDuration)
defer t.Stop()
for {
select {
case <-c.stopCh:
t.Stop()
return
case <-t.C:
}
c.mu.Lock()
if atomic.LoadUint32(&c.mode) != split {
// Stop the expirationWatcher on non-split mode.
@ -148,14 +153,67 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) {
prev := c.prev.Load().(*fastcache.Cache)
curr := c.curr.Load().(*fastcache.Cache)
c.prev.Store(curr)
var cs fastcache.Stats
prev.UpdateStats(&cs)
updateCacheStatsHistory(&c.csHistory, &cs)
prev.Reset()
c.curr.Store(prev)
c.mu.Unlock()
}
}
func (c *Cache) prevCacheWatcher() {
// Watch for the usage of the prev cache and drop it whenever it receives
// less than 5% of requests comparing to the curr cache during the last 10 seconds.
checkInterval := 10 * time.Second
checkInterval += timeJitter(checkInterval / 10)
t := time.NewTicker(checkInterval)
defer t.Stop()
prevGetCalls := uint64(0)
currGetCalls := uint64(0)
for {
select {
case <-c.stopCh:
return
case <-t.C:
}
c.mu.Lock()
if atomic.LoadUint32(&c.mode) != split {
// Do nothing in non-split mode.
c.mu.Unlock()
return
}
prev := c.prev.Load().(*fastcache.Cache)
curr := c.curr.Load().(*fastcache.Cache)
var csCurr, csPrev fastcache.Stats
curr.UpdateStats(&csCurr)
prev.UpdateStats(&csPrev)
currRequests := csCurr.GetCalls
if currRequests >= currGetCalls {
currRequests -= currGetCalls
}
prevRequests := csPrev.GetCalls
if prevRequests >= prevGetCalls {
prevRequests -= prevGetCalls
}
currGetCalls = csCurr.GetCalls
prevGetCalls = csPrev.GetCalls
if currRequests >= 20 && float64(prevRequests)/float64(currRequests) < 0.05 {
// The majority of requests are served from the curr cache,
// so the prev cache can be deleted in order to free up memory.
if csPrev.EntriesCount > 0 {
updateCacheStatsHistory(&c.csHistory, &csPrev)
prev.Reset()
}
}
c.mu.Unlock()
}
}
func (c *Cache) cacheSizeWatcher() {
t := time.NewTicker(1500 * time.Millisecond)
checkInterval := 1500 * time.Millisecond
checkInterval += timeJitter(checkInterval / 10)
t := time.NewTicker(checkInterval)
defer t.Stop()
var maxBytesSize uint64
@ -194,6 +252,9 @@ func (c *Cache) cacheSizeWatcher() {
prev := c.prev.Load().(*fastcache.Cache)
curr := c.curr.Load().(*fastcache.Cache)
c.prev.Store(curr)
var cs fastcache.Stats
prev.UpdateStats(&cs)
updateCacheStatsHistory(&c.csHistory, &cs)
prev.Reset()
// use c.maxBytes instead of maxBytesSize*2 for creating new cache, since otherwise the created cache
// couldn't be loaded from file with c.maxBytes limit after saving with maxBytesSize*2 limit.
@ -218,6 +279,9 @@ func (c *Cache) cacheSizeWatcher() {
c.setMode(whole)
prev = c.prev.Load().(*fastcache.Cache)
c.prev.Store(fastcache.New(1024))
cs.Reset()
prev.UpdateStats(&cs)
updateCacheStatsHistory(&c.csHistory, &cs)
prev.Reset()
c.mu.Unlock()
}
@ -241,9 +305,13 @@ func (c *Cache) Stop() {
// Reset resets the cache.
func (c *Cache) Reset() {
var cs fastcache.Stats
prev := c.prev.Load().(*fastcache.Cache)
prev.UpdateStats(&cs)
prev.Reset()
curr := c.curr.Load().(*fastcache.Cache)
curr.UpdateStats(&cs)
updateCacheStatsHistory(&c.csHistory, &cs)
curr.Reset()
// Reset the mode to `split` in the hope the working set size becomes smaller after the reset.
c.setMode(split)
@ -259,30 +327,43 @@ func (c *Cache) loadMode() int {
// UpdateStats updates fcs with cache stats.
func (c *Cache) UpdateStats(fcs *fastcache.Stats) {
updateCacheStatsHistory(fcs, &c.csHistory)
var cs fastcache.Stats
curr := c.curr.Load().(*fastcache.Cache)
curr.UpdateStats(&cs)
fcs.Collisions += cs.Collisions
fcs.Corruptions += cs.Corruptions
fcs.EntriesCount += cs.EntriesCount
fcs.BytesSize += cs.BytesSize
fcs.MaxBytesSize += cs.MaxBytesSize
fcs.GetCalls += atomic.LoadUint64(&c.cs.GetCalls)
fcs.SetCalls += atomic.LoadUint64(&c.cs.SetCalls)
fcs.Misses += atomic.LoadUint64(&c.cs.Misses)
updateCacheStats(fcs, &cs)
prev := c.prev.Load().(*fastcache.Cache)
cs.Reset()
prev.UpdateStats(&cs)
fcs.EntriesCount += cs.EntriesCount
fcs.BytesSize += cs.BytesSize
fcs.MaxBytesSize += cs.MaxBytesSize
updateCacheStats(fcs, &cs)
}
func updateCacheStats(dst, src *fastcache.Stats) {
dst.GetCalls += src.GetCalls
dst.SetCalls += src.SetCalls
dst.Misses += src.Misses
dst.Collisions += src.Collisions
dst.Corruptions += src.Corruptions
dst.EntriesCount += src.EntriesCount
dst.BytesSize += src.BytesSize
dst.MaxBytesSize += src.MaxBytesSize
}
func updateCacheStatsHistory(dst, src *fastcache.Stats) {
atomic.AddUint64(&dst.GetCalls, atomic.LoadUint64(&src.GetCalls))
atomic.AddUint64(&dst.SetCalls, atomic.LoadUint64(&src.SetCalls))
atomic.AddUint64(&dst.Misses, atomic.LoadUint64(&src.Misses))
atomic.AddUint64(&dst.Collisions, atomic.LoadUint64(&src.Collisions))
atomic.AddUint64(&dst.Corruptions, atomic.LoadUint64(&src.Corruptions))
// Do not add EntriesCount, BytesSize and MaxBytesSize, since these metrics
// are calculated from c.curr and c.prev caches.
}
// Get appends the found value for the given key to dst and returns the result.
func (c *Cache) Get(dst, key []byte) []byte {
atomic.AddUint64(&c.cs.GetCalls, 1)
curr := c.curr.Load().(*fastcache.Cache)
result := curr.Get(dst, key)
if len(result) > len(dst) {
@ -291,7 +372,6 @@ func (c *Cache) Get(dst, key []byte) []byte {
}
if c.loadMode() == whole {
// Nothing found.
atomic.AddUint64(&c.cs.Misses, 1)
return result
}
@ -300,7 +380,6 @@ func (c *Cache) Get(dst, key []byte) []byte {
result = prev.Get(dst, key)
if len(result) <= len(dst) {
// Nothing found.
atomic.AddUint64(&c.cs.Misses, 1)
return result
}
// Cache the found entry in the current cache.
@ -310,18 +389,15 @@ func (c *Cache) Get(dst, key []byte) []byte {
// Has verifies whether the cache contains the given key.
func (c *Cache) Has(key []byte) bool {
atomic.AddUint64(&c.cs.GetCalls, 1)
curr := c.curr.Load().(*fastcache.Cache)
if curr.Has(key) {
return true
}
if c.loadMode() == whole {
atomic.AddUint64(&c.cs.Misses, 1)
return false
}
prev := c.prev.Load().(*fastcache.Cache)
if !prev.Has(key) {
atomic.AddUint64(&c.cs.Misses, 1)
return false
}
// Cache the found entry in the current cache.
@ -336,14 +412,12 @@ var tmpBufPool bytesutil.ByteBufferPool
// Set sets the given value for the given key.
func (c *Cache) Set(key, value []byte) {
atomic.AddUint64(&c.cs.SetCalls, 1)
curr := c.curr.Load().(*fastcache.Cache)
curr.Set(key, value)
}
// GetBig appends the found value for the given key to dst and returns the result.
func (c *Cache) GetBig(dst, key []byte) []byte {
atomic.AddUint64(&c.cs.GetCalls, 1)
curr := c.curr.Load().(*fastcache.Cache)
result := curr.GetBig(dst, key)
if len(result) > len(dst) {
@ -352,7 +426,6 @@ func (c *Cache) GetBig(dst, key []byte) []byte {
}
if c.loadMode() == whole {
// Nothing found.
atomic.AddUint64(&c.cs.Misses, 1)
return result
}
@ -361,7 +434,6 @@ func (c *Cache) GetBig(dst, key []byte) []byte {
result = prev.GetBig(dst, key)
if len(result) <= len(dst) {
// Nothing found.
atomic.AddUint64(&c.cs.Misses, 1)
return result
}
// Cache the found entry in the current cache.
@ -371,7 +443,11 @@ func (c *Cache) GetBig(dst, key []byte) []byte {
// SetBig sets the given value for the given key.
func (c *Cache) SetBig(key, value []byte) {
atomic.AddUint64(&c.cs.SetCalls, 1)
curr := c.curr.Load().(*fastcache.Cache)
curr.SetBig(key, value)
}
func timeJitter(d time.Duration) time.Duration {
n := float64(time.Now().UnixNano()%1e9) / 1e9
return time.Duration(float64(d) * n)
}