mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
532 lines
14 KiB
Go
532 lines
14 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
|
)
|
|
|
|
// StorageStats represents stats for the storage. It may be obtained by calling Storage.UpdateStats().
|
|
type StorageStats struct {
|
|
// RowsDroppedTooBigTimestamp is the number of rows dropped during data ingestion because their timestamp is smaller than the minimum allowed
|
|
RowsDroppedTooBigTimestamp uint64
|
|
|
|
// RowsDroppedTooSmallTimestamp is the number of rows dropped during data ingestion because their timestamp is bigger than the maximum allowed
|
|
RowsDroppedTooSmallTimestamp uint64
|
|
|
|
// PartitionsCount is the number of partitions in the storage
|
|
PartitionsCount uint64
|
|
|
|
PartitionStats
|
|
}
|
|
|
|
// Reset resets s.
|
|
func (s *StorageStats) Reset() {
|
|
*s = StorageStats{}
|
|
}
|
|
|
|
// StorageConfig is the config for the Storage.
|
|
type StorageConfig struct {
|
|
// Retention is the retention for the ingested data.
|
|
//
|
|
// Older data is automatically deleted.
|
|
Retention time.Duration
|
|
|
|
// FlushInterval is the interval for flushing the in-memory data to disk at the Storage
|
|
FlushInterval time.Duration
|
|
|
|
// FutureRetention is the allowed retention from the current time to future for the ingested data.
|
|
//
|
|
// Log entries with timestamps bigger than now+FutureRetention are ignored.
|
|
FutureRetention time.Duration
|
|
|
|
// LogNewStreams indicates whether to log newly created log streams.
|
|
//
|
|
// This can be useful for debugging of high cardinality issues.
|
|
// https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#high-cardinality
|
|
LogNewStreams bool
|
|
|
|
// LogIngestedRows indicates whether to log the ingested log entries.
|
|
//
|
|
// This can be useful for debugging of data ingestion.
|
|
LogIngestedRows bool
|
|
}
|
|
|
|
// Storage is the storage for log entries.
|
|
type Storage struct {
|
|
rowsDroppedTooBigTimestamp uint64
|
|
rowsDroppedTooSmallTimestamp uint64
|
|
|
|
// path is the path to the Storage directory
|
|
path string
|
|
|
|
// retention is the retention for the stored data
|
|
//
|
|
// older data is automatically deleted
|
|
retention time.Duration
|
|
|
|
// flushInterval is the interval for flushing in-memory data to disk
|
|
flushInterval time.Duration
|
|
|
|
// futureRetention is the maximum allowed interval to write data into the future
|
|
futureRetention time.Duration
|
|
|
|
// logNewStreams instructs to log new streams if it is set to true
|
|
logNewStreams bool
|
|
|
|
// logIngestedRows instructs to log all the ingested log entries if it is set to true
|
|
logIngestedRows bool
|
|
|
|
// flockF is a file, which makes sure that the Storage is opened by a single process
|
|
flockF *os.File
|
|
|
|
// partitions is a list of partitions for the Storage.
|
|
//
|
|
// It must be accessed under partitionsLock.
|
|
partitions []*partitionWrapper
|
|
|
|
// ptwHot is the "hot" partition, were the last rows were ingested.
|
|
//
|
|
// It must be accessed under partitionsLock.
|
|
ptwHot *partitionWrapper
|
|
|
|
// partitionsLock protects partitions and ptwHot.
|
|
partitionsLock sync.Mutex
|
|
|
|
// stopCh is closed when the Storage must be stopped.
|
|
stopCh chan struct{}
|
|
|
|
// wg is used for waiting for background workers at MustClose().
|
|
wg sync.WaitGroup
|
|
|
|
// streamIDCache caches (partition, streamIDs) seen during data ingestion.
|
|
//
|
|
// It reduces the load on persistent storage during data ingestion by skipping
|
|
// the check whether the given stream is already registered in the persistent storage.
|
|
streamIDCache *workingsetcache.Cache
|
|
|
|
// streamTagsCache caches StreamTags entries keyed by streamID.
|
|
//
|
|
// There is no need to put partition into the key for StreamTags,
|
|
// since StreamTags are uniquely identified by streamID.
|
|
//
|
|
// It reduces the load on persistent storage during querying
|
|
// when StreamTags must be found for the particular streamID
|
|
streamTagsCache *workingsetcache.Cache
|
|
|
|
// streamFilterCache caches streamIDs keyed by (partition, []TenanID, StreamFilter).
|
|
//
|
|
// It reduces the load on persistent storage during querying by _stream:{...} filter.
|
|
streamFilterCache *workingsetcache.Cache
|
|
}
|
|
|
|
type partitionWrapper struct {
|
|
// refCount is the number of active references to p.
|
|
// When it reaches zero, then the p is closed.
|
|
refCount int32
|
|
|
|
// The flag, which is set when the partition must be deleted after refCount reaches zero.
|
|
mustBeDeleted uint32
|
|
|
|
// day is the day for the partition in the unix timestamp divided by the number of seconds in the day.
|
|
day int64
|
|
|
|
// pt is the wrapped partition.
|
|
pt *partition
|
|
}
|
|
|
|
func newPartitionWrapper(pt *partition, day int64) *partitionWrapper {
|
|
pw := &partitionWrapper{
|
|
day: day,
|
|
pt: pt,
|
|
}
|
|
pw.incRef()
|
|
return pw
|
|
}
|
|
|
|
func (ptw *partitionWrapper) incRef() {
|
|
atomic.AddInt32(&ptw.refCount, 1)
|
|
}
|
|
|
|
func (ptw *partitionWrapper) decRef() {
|
|
n := atomic.AddInt32(&ptw.refCount, -1)
|
|
if n > 0 {
|
|
return
|
|
}
|
|
|
|
deletePath := ""
|
|
if atomic.LoadUint32(&ptw.mustBeDeleted) != 0 {
|
|
deletePath = ptw.pt.path
|
|
}
|
|
|
|
// Close pw.pt, since nobody refers to it.
|
|
mustClosePartition(ptw.pt)
|
|
ptw.pt = nil
|
|
|
|
// Delete partition if needed.
|
|
if deletePath != "" {
|
|
mustDeletePartition(deletePath)
|
|
}
|
|
}
|
|
|
|
func (ptw *partitionWrapper) canAddAllRows(lr *LogRows) bool {
|
|
minTimestamp := ptw.day * nsecPerDay
|
|
maxTimestamp := minTimestamp + nsecPerDay - 1
|
|
for _, ts := range lr.timestamps {
|
|
if ts < minTimestamp || ts > maxTimestamp {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// mustCreateStorage creates Storage at the given path.
|
|
func mustCreateStorage(path string) {
|
|
fs.MustMkdirFailIfExist(path)
|
|
|
|
partitionsPath := filepath.Join(path, partitionsDirname)
|
|
fs.MustMkdirFailIfExist(partitionsPath)
|
|
}
|
|
|
|
// MustOpenStorage opens Storage at the given path.
|
|
//
|
|
// MustClose must be called on the returned Storage when it is no longer needed.
|
|
func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
|
|
flushInterval := cfg.FlushInterval
|
|
if flushInterval < time.Second {
|
|
flushInterval = time.Second
|
|
}
|
|
|
|
retention := cfg.Retention
|
|
if retention < 24*time.Hour {
|
|
retention = 24 * time.Hour
|
|
}
|
|
|
|
futureRetention := cfg.FutureRetention
|
|
if futureRetention < 24*time.Hour {
|
|
futureRetention = 24 * time.Hour
|
|
}
|
|
|
|
if !fs.IsPathExist(path) {
|
|
mustCreateStorage(path)
|
|
}
|
|
|
|
flockF := fs.MustCreateFlockFile(path)
|
|
|
|
// Load caches
|
|
mem := memory.Allowed()
|
|
streamIDCachePath := filepath.Join(path, cacheDirname, streamIDCacheFilename)
|
|
streamIDCache := workingsetcache.Load(streamIDCachePath, mem/16)
|
|
|
|
streamTagsCache := workingsetcache.New(mem / 10)
|
|
|
|
streamFilterCache := workingsetcache.New(mem / 10)
|
|
|
|
s := &Storage{
|
|
path: path,
|
|
retention: retention,
|
|
flushInterval: flushInterval,
|
|
futureRetention: futureRetention,
|
|
logNewStreams: cfg.LogNewStreams,
|
|
logIngestedRows: cfg.LogIngestedRows,
|
|
flockF: flockF,
|
|
stopCh: make(chan struct{}),
|
|
|
|
streamIDCache: streamIDCache,
|
|
streamTagsCache: streamTagsCache,
|
|
streamFilterCache: streamFilterCache,
|
|
}
|
|
|
|
partitionsPath := filepath.Join(path, partitionsDirname)
|
|
fs.MustMkdirIfNotExist(partitionsPath)
|
|
des := fs.MustReadDir(partitionsPath)
|
|
ptws := make([]*partitionWrapper, len(des))
|
|
for i, de := range des {
|
|
fname := de.Name()
|
|
|
|
// Parse the day for the partition
|
|
t, err := time.Parse(partitionNameFormat, fname)
|
|
if err != nil {
|
|
logger.Panicf("FATAL: cannot parse partition filename %q at %q; it must be in the form YYYYMMDD: %s", fname, partitionsPath, err)
|
|
}
|
|
day := t.UTC().UnixNano() / nsecPerDay
|
|
|
|
partitionPath := filepath.Join(partitionsPath, fname)
|
|
pt := mustOpenPartition(s, partitionPath)
|
|
ptws[i] = newPartitionWrapper(pt, day)
|
|
}
|
|
sort.Slice(ptws, func(i, j int) bool {
|
|
return ptws[i].day < ptws[j].day
|
|
})
|
|
|
|
// Delete partitions from the future if needed
|
|
maxAllowedDay := s.getMaxAllowedDay()
|
|
j := len(ptws) - 1
|
|
for j >= 0 {
|
|
ptw := ptws[j]
|
|
if ptw.day <= maxAllowedDay {
|
|
break
|
|
}
|
|
logger.Infof("the partition %s is scheduled to be deleted because it is outside the -futureRetention=%dd", ptw.pt.path, durationToDays(s.futureRetention))
|
|
atomic.StoreUint32(&ptw.mustBeDeleted, 1)
|
|
ptw.decRef()
|
|
j--
|
|
}
|
|
j++
|
|
for i := j; i < len(ptws); i++ {
|
|
ptws[i] = nil
|
|
}
|
|
ptws = ptws[:j]
|
|
|
|
s.partitions = ptws
|
|
s.runRetentionWatcher()
|
|
return s
|
|
}
|
|
|
|
const partitionNameFormat = "20060102"
|
|
|
|
func (s *Storage) runRetentionWatcher() {
|
|
s.wg.Add(1)
|
|
go func() {
|
|
s.watchRetention()
|
|
s.wg.Done()
|
|
}()
|
|
}
|
|
|
|
func (s *Storage) watchRetention() {
|
|
ticker := time.NewTicker(time.Hour)
|
|
defer ticker.Stop()
|
|
for {
|
|
var ptwsToDelete []*partitionWrapper
|
|
minAllowedDay := s.getMinAllowedDay()
|
|
|
|
s.partitionsLock.Lock()
|
|
|
|
// Delete outdated partitions.
|
|
// s.partitions are sorted by day, so the partitions, which can become outdated, are located at the beginning of the list
|
|
for _, ptw := range s.partitions {
|
|
if ptw.day >= minAllowedDay {
|
|
break
|
|
}
|
|
ptwsToDelete = append(ptwsToDelete, ptw)
|
|
}
|
|
for i := range ptwsToDelete {
|
|
s.partitions[i] = nil
|
|
}
|
|
s.partitions = s.partitions[len(ptwsToDelete):]
|
|
|
|
s.partitionsLock.Unlock()
|
|
|
|
for _, ptw := range ptwsToDelete {
|
|
logger.Infof("the partition %s is scheduled to be deleted because it is outside the -retentionPeriod=%dd", ptw.pt.path, durationToDays(s.retention))
|
|
atomic.StoreUint32(&ptw.mustBeDeleted, 1)
|
|
ptw.decRef()
|
|
}
|
|
|
|
select {
|
|
case <-s.stopCh:
|
|
return
|
|
case <-ticker.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Storage) getMinAllowedDay() int64 {
|
|
return time.Now().UTC().Add(-s.retention).UnixNano() / nsecPerDay
|
|
}
|
|
|
|
func (s *Storage) getMaxAllowedDay() int64 {
|
|
return time.Now().UTC().Add(s.futureRetention).UnixNano() / nsecPerDay
|
|
}
|
|
|
|
// MustClose closes s.
|
|
//
|
|
// It is expected that nobody uses the storage at the close time.
|
|
func (s *Storage) MustClose() {
|
|
// Stop background workers
|
|
close(s.stopCh)
|
|
s.wg.Wait()
|
|
|
|
// Close partitions
|
|
for _, pw := range s.partitions {
|
|
pw.decRef()
|
|
if pw.refCount != 0 {
|
|
logger.Panicf("BUG: there are %d users of partition", pw.refCount)
|
|
}
|
|
}
|
|
s.partitions = nil
|
|
|
|
// Save caches
|
|
streamIDCachePath := filepath.Join(s.path, cacheDirname, streamIDCacheFilename)
|
|
if err := s.streamIDCache.Save(streamIDCachePath); err != nil {
|
|
logger.Panicf("FATAL: cannot save streamID cache to %q: %s", streamIDCachePath, err)
|
|
}
|
|
s.streamIDCache.Stop()
|
|
s.streamIDCache = nil
|
|
|
|
s.streamTagsCache.Stop()
|
|
s.streamTagsCache = nil
|
|
|
|
s.streamFilterCache.Stop()
|
|
s.streamFilterCache = nil
|
|
|
|
// release lock file
|
|
fs.MustClose(s.flockF)
|
|
s.flockF = nil
|
|
|
|
s.path = ""
|
|
}
|
|
|
|
// MustAddRows adds lr to s.
|
|
func (s *Storage) MustAddRows(lr *LogRows) {
|
|
// Fast path - try adding all the rows to the hot partition
|
|
s.partitionsLock.Lock()
|
|
ptwHot := s.ptwHot
|
|
if ptwHot != nil {
|
|
ptwHot.incRef()
|
|
}
|
|
s.partitionsLock.Unlock()
|
|
|
|
if ptwHot != nil {
|
|
if ptwHot.canAddAllRows(lr) {
|
|
ptwHot.pt.mustAddRows(lr)
|
|
ptwHot.decRef()
|
|
return
|
|
}
|
|
ptwHot.decRef()
|
|
}
|
|
|
|
// Slow path - rows cannot be added to the hot partition, so split rows among available partitions
|
|
minAllowedDay := s.getMinAllowedDay()
|
|
maxAllowedDay := s.getMaxAllowedDay()
|
|
m := make(map[int64]*LogRows)
|
|
for i, ts := range lr.timestamps {
|
|
day := ts / nsecPerDay
|
|
if day < minAllowedDay {
|
|
rf := RowFormatter(lr.rows[i])
|
|
tsf := TimeFormatter(ts)
|
|
minAllowedTsf := TimeFormatter(minAllowedDay * nsecPerDay)
|
|
tooSmallTimestampLogger.Warnf("skipping log entry with too small timestamp=%s; it must be bigger than %s according "+
|
|
"to the configured -retentionPeriod. See https://docs.victoriametrics.com/VictoriaLogs/#retention ; "+
|
|
"log entry: %s", &tsf, &minAllowedTsf, &rf)
|
|
atomic.AddUint64(&s.rowsDroppedTooSmallTimestamp, 1)
|
|
continue
|
|
}
|
|
if day > maxAllowedDay {
|
|
rf := RowFormatter(lr.rows[i])
|
|
tsf := TimeFormatter(ts)
|
|
maxAllowedTsf := TimeFormatter(maxAllowedDay * nsecPerDay)
|
|
tooBigTimestampLogger.Warnf("skipping log entry with too big timestamp=%s; it must be smaller than %s according "+
|
|
"to the configured -futureRetention; see https://docs.victoriametrics.com/VictoriaLogs/#retention ; "+
|
|
"log entry: %s", &tsf, &maxAllowedTsf, &rf)
|
|
atomic.AddUint64(&s.rowsDroppedTooBigTimestamp, 1)
|
|
continue
|
|
}
|
|
lrPart := m[day]
|
|
if lrPart == nil {
|
|
lrPart = GetLogRows(nil, nil)
|
|
m[day] = lrPart
|
|
}
|
|
lrPart.mustAddInternal(lr.streamIDs[i], ts, lr.rows[i], lr.streamTagsCanonicals[i])
|
|
}
|
|
for day, lrPart := range m {
|
|
ptw := s.getPartitionForDay(day)
|
|
ptw.pt.mustAddRows(lrPart)
|
|
ptw.decRef()
|
|
PutLogRows(lrPart)
|
|
}
|
|
}
|
|
|
|
var tooSmallTimestampLogger = logger.WithThrottler("too_small_timestamp", 5*time.Second)
|
|
var tooBigTimestampLogger = logger.WithThrottler("too_big_timestamp", 5*time.Second)
|
|
|
|
const nsecPerDay = 24 * 3600 * 1e9
|
|
|
|
// TimeFormatter implements fmt.Stringer for timestamp in nanoseconds
|
|
type TimeFormatter int64
|
|
|
|
// String returns human-readable representation for tf.
|
|
func (tf *TimeFormatter) String() string {
|
|
ts := int64(*tf)
|
|
t := time.Unix(0, ts).UTC()
|
|
return t.Format(time.RFC3339Nano)
|
|
}
|
|
|
|
func (s *Storage) getPartitionForDay(day int64) *partitionWrapper {
|
|
s.partitionsLock.Lock()
|
|
|
|
// Search for the partition using binary search
|
|
ptws := s.partitions
|
|
n := sort.Search(len(ptws), func(i int) bool {
|
|
return ptws[i].day >= day
|
|
})
|
|
var ptw *partitionWrapper
|
|
if n < len(ptws) {
|
|
ptw = ptws[n]
|
|
if ptw.day != day {
|
|
ptw = nil
|
|
}
|
|
}
|
|
if ptw == nil {
|
|
// Missing partition for the given day. Create it.
|
|
fname := time.Unix(0, day*nsecPerDay).UTC().Format(partitionNameFormat)
|
|
partitionPath := filepath.Join(s.path, partitionsDirname, fname)
|
|
mustCreatePartition(partitionPath)
|
|
|
|
pt := mustOpenPartition(s, partitionPath)
|
|
ptw = newPartitionWrapper(pt, day)
|
|
if n == len(ptws) {
|
|
ptws = append(ptws, ptw)
|
|
} else {
|
|
ptws = append(ptws[:n+1], ptws[n:]...)
|
|
ptws[n] = ptw
|
|
}
|
|
s.partitions = ptws
|
|
}
|
|
|
|
s.ptwHot = ptw
|
|
ptw.incRef()
|
|
|
|
s.partitionsLock.Unlock()
|
|
|
|
return ptw
|
|
}
|
|
|
|
// UpdateStats updates ss for the given s.
|
|
func (s *Storage) UpdateStats(ss *StorageStats) {
|
|
ss.RowsDroppedTooBigTimestamp += atomic.LoadUint64(&s.rowsDroppedTooBigTimestamp)
|
|
ss.RowsDroppedTooSmallTimestamp += atomic.LoadUint64(&s.rowsDroppedTooSmallTimestamp)
|
|
|
|
s.partitionsLock.Lock()
|
|
ss.PartitionsCount += uint64(len(s.partitions))
|
|
for _, ptw := range s.partitions {
|
|
ptw.pt.updateStats(&ss.PartitionStats)
|
|
}
|
|
s.partitionsLock.Unlock()
|
|
}
|
|
|
|
func (s *Storage) debugFlush() {
|
|
s.partitionsLock.Lock()
|
|
ptws := append([]*partitionWrapper{}, s.partitions...)
|
|
for _, ptw := range ptws {
|
|
ptw.incRef()
|
|
}
|
|
s.partitionsLock.Unlock()
|
|
|
|
for _, ptw := range ptws {
|
|
ptw.pt.debugFlush()
|
|
ptw.decRef()
|
|
}
|
|
}
|
|
|
|
func durationToDays(d time.Duration) int64 {
|
|
return int64(d / (time.Hour * 24))
|
|
}
|