mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/storage: reuse timestamp blocks for adjancent metric blocks with identical timestamps
This should reduce disk space usage when scraping targets containing metrics with identical names such as `node_cpu_seconds_total`, histograms, quantiles, etc. Expose `vm_timestamps_blocks_merged_total` and `vm_timestamps_bytes_saved_total` metrics for monitoring the effectiveness of timestamp blocks merging.
This commit is contained in:
parent
d7c04db1fc
commit
9d8fdff6c5
4 changed files with 68 additions and 11 deletions
|
@ -456,6 +456,13 @@ func registerStorageMetrics() {
|
|||
return float64(m().SlowMetricNameLoads)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_timestamps_blocks_merged_total`, func() float64 {
|
||||
return float64(m().TimestampsBlocksMerged)
|
||||
})
|
||||
metrics.NewGauge(`vm_timestamps_bytes_saved_total`, func() float64 {
|
||||
return float64(m().TimestampsBytesSaved)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_rows{type="storage/big"}`, func() float64 {
|
||||
return float64(tm().BigRowsCount)
|
||||
})
|
||||
|
|
|
@ -51,6 +51,9 @@ type blockStreamReader struct {
|
|||
valuesBlockOffset uint64
|
||||
indexBlockOffset uint64
|
||||
|
||||
prevTimestampsBlockOffset uint64
|
||||
prevTimestampsData []byte
|
||||
|
||||
indexData []byte
|
||||
compressedIndexData []byte
|
||||
|
||||
|
@ -87,6 +90,9 @@ func (bsr *blockStreamReader) reset() {
|
|||
bsr.valuesBlockOffset = 0
|
||||
bsr.indexBlockOffset = 0
|
||||
|
||||
bsr.prevTimestampsBlockOffset = 0
|
||||
bsr.prevTimestampsData = bsr.prevTimestampsData[:0]
|
||||
|
||||
bsr.indexData = bsr.indexData[:0]
|
||||
bsr.compressedIndexData = bsr.compressedIndexData[:0]
|
||||
|
||||
|
@ -275,7 +281,13 @@ func (bsr *blockStreamReader) readBlock() error {
|
|||
return fmt.Errorf("invalid MaxTimestamp at block header at offset %d; got %d; cannot be bigger than %d",
|
||||
bsr.prevIndexBlockOffset(), bsr.Block.bh.MaxTimestamp, bsr.ph.MaxTimestamp)
|
||||
}
|
||||
if bsr.Block.bh.TimestampsBlockOffset != bsr.timestampsBlockOffset {
|
||||
usePrevTimestamps := len(bsr.prevTimestampsData) > 0 && bsr.Block.bh.TimestampsBlockOffset == bsr.prevTimestampsBlockOffset
|
||||
if usePrevTimestamps {
|
||||
if int(bsr.Block.bh.TimestampsBlockSize) != len(bsr.prevTimestampsData) {
|
||||
return fmt.Errorf("invalid TimestampsBlockSize at block header at offset %d; got %d; want %d",
|
||||
bsr.prevIndexBlockOffset(), bsr.Block.bh.TimestampsBlockSize, len(bsr.prevTimestampsData))
|
||||
}
|
||||
} else if bsr.Block.bh.TimestampsBlockOffset != bsr.timestampsBlockOffset {
|
||||
return fmt.Errorf("invalid TimestampsBlockOffset at block header at offset %d; got %d; want %d",
|
||||
bsr.prevIndexBlockOffset(), bsr.Block.bh.TimestampsBlockOffset, bsr.timestampsBlockOffset)
|
||||
}
|
||||
|
@ -285,9 +297,15 @@ func (bsr *blockStreamReader) readBlock() error {
|
|||
}
|
||||
|
||||
// Read timestamps data.
|
||||
bsr.Block.timestampsData = bytesutil.Resize(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize))
|
||||
if err := fs.ReadFullData(bsr.timestampsReader, bsr.Block.timestampsData); err != nil {
|
||||
return fmt.Errorf("cannot read timestamps block at offset %d: %w", bsr.timestampsBlockOffset, err)
|
||||
if usePrevTimestamps {
|
||||
bsr.Block.timestampsData = append(bsr.Block.timestampsData[:0], bsr.prevTimestampsData...)
|
||||
} else {
|
||||
bsr.Block.timestampsData = bytesutil.Resize(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize))
|
||||
if err := fs.ReadFullData(bsr.timestampsReader, bsr.Block.timestampsData); err != nil {
|
||||
return fmt.Errorf("cannot read timestamps block at offset %d: %w", bsr.timestampsBlockOffset, err)
|
||||
}
|
||||
bsr.prevTimestampsBlockOffset = bsr.timestampsBlockOffset
|
||||
bsr.prevTimestampsData = append(bsr.prevTimestampsData[:0], bsr.Block.timestampsData...)
|
||||
}
|
||||
|
||||
// Read values data.
|
||||
|
@ -297,7 +315,9 @@ func (bsr *blockStreamReader) readBlock() error {
|
|||
}
|
||||
|
||||
// Update offsets.
|
||||
bsr.timestampsBlockOffset += uint64(bsr.Block.bh.TimestampsBlockSize)
|
||||
if !usePrevTimestamps {
|
||||
bsr.timestampsBlockOffset += uint64(bsr.Block.bh.TimestampsBlockSize)
|
||||
}
|
||||
bsr.valuesBlockOffset += uint64(bsr.Block.bh.ValuesBlockSize)
|
||||
bsr.indexBlockHeadersCount++
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"path/filepath"
|
||||
|
@ -38,6 +39,13 @@ type blockStreamWriter struct {
|
|||
|
||||
metaindexData []byte
|
||||
compressedMetaindexData []byte
|
||||
|
||||
// prevTimestamps* is used as an optimization for reducing disk space usage
|
||||
// when serially written blocks have identical timestamps.
|
||||
// This is usually the case when adjancent blocks contain metrics scraped from the same target,
|
||||
// since such metrics have identical timestamps.
|
||||
prevTimestampsData []byte
|
||||
prevTimestampsBlockOffset uint64
|
||||
}
|
||||
|
||||
func (bsw *blockStreamWriter) assertWriteClosers() {
|
||||
|
@ -66,6 +74,9 @@ func (bsw *blockStreamWriter) reset() {
|
|||
|
||||
bsw.metaindexData = bsw.metaindexData[:0]
|
||||
bsw.compressedMetaindexData = bsw.compressedMetaindexData[:0]
|
||||
|
||||
bsw.prevTimestampsData = bsw.prevTimestampsData[:0]
|
||||
bsw.prevTimestampsBlockOffset = 0
|
||||
}
|
||||
|
||||
// InitFromInmemoryPart initialzes bsw from inmemory part.
|
||||
|
@ -177,22 +188,35 @@ func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsM
|
|||
atomic.AddUint64(rowsMerged, uint64(b.rowsCount()))
|
||||
b.deduplicateSamplesDuringMerge()
|
||||
headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset)
|
||||
|
||||
usePrevTimestamps := len(bsw.prevTimestampsData) > 0 && bytes.Equal(timestampsData, bsw.prevTimestampsData)
|
||||
if usePrevTimestamps {
|
||||
// The current timestamps block equals to the previous timestamps block.
|
||||
// Update headerData so it points to the previous timestamps block. This saves disk space.
|
||||
headerData, timestampsData, valuesData = b.MarshalData(bsw.prevTimestampsBlockOffset, bsw.valuesBlockOffset)
|
||||
atomic.AddUint64(×tampsBlocksMerged, 1)
|
||||
atomic.AddUint64(×tampsBytesSaved, uint64(len(timestampsData)))
|
||||
}
|
||||
bsw.indexData = append(bsw.indexData, headerData...)
|
||||
bsw.mr.RegisterBlockHeader(&b.bh)
|
||||
if len(bsw.indexData) >= maxBlockSize {
|
||||
bsw.flushIndexData()
|
||||
}
|
||||
|
||||
fs.MustWriteData(bsw.timestampsWriter, timestampsData)
|
||||
bsw.timestampsBlockOffset += uint64(len(timestampsData))
|
||||
|
||||
if !usePrevTimestamps {
|
||||
bsw.prevTimestampsData = append(bsw.prevTimestampsData[:0], timestampsData...)
|
||||
bsw.prevTimestampsBlockOffset = bsw.timestampsBlockOffset
|
||||
fs.MustWriteData(bsw.timestampsWriter, timestampsData)
|
||||
bsw.timestampsBlockOffset += uint64(len(timestampsData))
|
||||
}
|
||||
fs.MustWriteData(bsw.valuesWriter, valuesData)
|
||||
bsw.valuesBlockOffset += uint64(len(valuesData))
|
||||
|
||||
updatePartHeader(b, ph)
|
||||
}
|
||||
|
||||
var (
|
||||
timestampsBlocksMerged uint64
|
||||
timestampsBytesSaved uint64
|
||||
)
|
||||
|
||||
func updatePartHeader(b *Block, ph *partHeader) {
|
||||
ph.BlocksCount++
|
||||
ph.RowsCount += uint64(b.bh.RowsCount)
|
||||
|
|
|
@ -340,6 +340,9 @@ type Metrics struct {
|
|||
SlowPerDayIndexInserts uint64
|
||||
SlowMetricNameLoads uint64
|
||||
|
||||
TimestampsBlocksMerged uint64
|
||||
TimestampsBytesSaved uint64
|
||||
|
||||
TSIDCacheSize uint64
|
||||
TSIDCacheSizeBytes uint64
|
||||
TSIDCacheRequests uint64
|
||||
|
@ -405,6 +408,9 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
|||
m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts)
|
||||
m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads)
|
||||
|
||||
m.TimestampsBlocksMerged = atomic.LoadUint64(×tampsBlocksMerged)
|
||||
m.TimestampsBytesSaved = atomic.LoadUint64(×tampsBytesSaved)
|
||||
|
||||
var cs fastcache.Stats
|
||||
s.tsidCache.UpdateStats(&cs)
|
||||
m.TSIDCacheSize += cs.EntriesCount
|
||||
|
|
Loading…
Reference in a new issue