2019-05-22 21:16:55 +00:00
package storage
import (
2021-03-18 12:52:49 +00:00
"bytes"
2019-05-22 21:16:55 +00:00
"fmt"
2020-01-30 22:54:28 +00:00
"io"
2019-06-14 04:52:32 +00:00
"io/ioutil"
2019-05-22 21:16:55 +00:00
"math"
"os"
"path/filepath"
"regexp"
"sort"
2021-02-02 22:24:05 +00:00
"strings"
2019-05-22 21:16:55 +00:00
"sync"
"sync/atomic"
"time"
"unsafe"
2021-05-20 11:15:19 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
2020-12-08 18:49:32 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
2020-05-14 19:01:51 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
2020-07-22 21:58:48 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
2019-05-28 14:17:19 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2019-09-24 18:10:22 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
2019-08-13 18:35:19 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/fastcache"
)
2020-10-20 11:29:26 +00:00
const (
msecsPerMonth = 31 * 24 * 3600 * 1000
maxRetentionMsecs = 100 * 12 * msecsPerMonth
)
2019-05-22 21:16:55 +00:00
// Storage represents TSDB storage.
type Storage struct {
2019-10-17 15:22:56 +00:00
// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 .
tooSmallTimestampRows uint64
tooBigTimestampRows uint64
addRowsConcurrencyLimitReached uint64
addRowsConcurrencyLimitTimeout uint64
addRowsConcurrencyDroppedRows uint64
2020-08-05 15:24:51 +00:00
searchTSIDsConcurrencyLimitReached uint64
searchTSIDsConcurrencyLimitTimeout uint64
2020-05-15 10:44:23 +00:00
slowRowInserts uint64
slowPerDayIndexInserts uint64
2020-05-15 11:11:39 +00:00
slowMetricNameLoads uint64
2020-05-15 10:44:23 +00:00
2021-05-20 11:15:19 +00:00
hourlySeriesLimitRowsDropped uint64
dailySeriesLimitRowsDropped uint64
2020-10-20 13:10:46 +00:00
path string
cachePath string
retentionMsecs int64
2019-05-22 21:16:55 +00:00
2019-05-25 18:51:11 +00:00
// lock file for exclusive access to the storage on the given path.
2019-05-22 21:16:55 +00:00
flockF * os . File
idbCurr atomic . Value
tb * table
2021-05-20 11:15:19 +00:00
// Series cardinality limiters.
hourlySeriesLimiter * bloomfilter . Limiter
dailySeriesLimiter * bloomfilter . Limiter
2019-05-22 21:16:55 +00:00
// tsidCache is MetricName -> TSID cache.
2019-08-13 18:35:19 +00:00
tsidCache * workingsetcache . Cache
2019-05-22 21:16:55 +00:00
// metricIDCache is MetricID -> TSID cache.
2019-08-13 18:35:19 +00:00
metricIDCache * workingsetcache . Cache
2019-05-22 21:16:55 +00:00
// metricNameCache is MetricID -> MetricName cache.
2019-08-13 18:35:19 +00:00
metricNameCache * workingsetcache . Cache
2019-05-22 21:16:55 +00:00
// dateMetricIDCache is (Date, MetricID) cache.
2019-11-09 21:05:14 +00:00
dateMetricIDCache * dateMetricIDCache
2019-05-22 21:16:55 +00:00
2019-11-11 22:16:42 +00:00
// Fast cache for MetricID values occurred during the current hour.
2019-06-09 16:06:53 +00:00
currHourMetricIDs atomic . Value
2019-11-11 22:16:42 +00:00
// Fast cache for MetricID values occurred during the previous hour.
2019-06-09 16:06:53 +00:00
prevHourMetricIDs atomic . Value
2020-05-11 22:06:17 +00:00
// Fast cache for pre-populating per-day inverted index for the next day.
// This is needed in order to remove CPU usage spikes at 00:00 UTC
// due to creation of per-day inverted index for active time series.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 for details.
nextDayMetricIDs atomic . Value
2019-06-09 16:06:53 +00:00
// Pending MetricID values to be added to currHourMetricIDs.
2019-11-08 17:37:16 +00:00
pendingHourEntriesLock sync . Mutex
pendingHourEntries * uint64set . Set
2019-06-02 15:34:08 +00:00
2020-05-11 22:06:17 +00:00
// Pending MetricIDs to be added to nextDayMetricIDs.
pendingNextDayMetricIDsLock sync . Mutex
pendingNextDayMetricIDs * uint64set . Set
2020-01-29 23:59:43 +00:00
// metricIDs for pre-fetched metricNames in the prefetchMetricNames function.
prefetchedMetricIDs atomic . Value
2019-06-02 15:34:08 +00:00
stop chan struct { }
2020-08-06 13:48:21 +00:00
currHourMetricIDsUpdaterWG sync . WaitGroup
nextDayMetricIDsUpdaterWG sync . WaitGroup
retentionWatcherWG sync . WaitGroup
2020-03-24 20:24:54 +00:00
// The snapshotLock prevents from concurrent creation of snapshots,
// since this may result in snapshots without recently added data,
// which may be in the process of flushing to disk by concurrently running
// snapshot process.
snapshotLock sync . Mutex
2021-02-10 12:37:14 +00:00
// The minimum timestamp when composite index search can be used.
minTimestampForCompositeIndex int64
2019-05-22 21:16:55 +00:00
}
2020-10-20 11:29:26 +00:00
// OpenStorage opens storage on the given path with the given retentionMsecs.
2021-05-20 11:15:19 +00:00
func OpenStorage ( path string , retentionMsecs int64 , maxHourlySeries , maxDailySeries int ) ( * Storage , error ) {
2019-05-22 21:16:55 +00:00
path , err := filepath . Abs ( path )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot determine absolute path for %q: %w" , path , err )
2019-05-22 21:16:55 +00:00
}
2020-10-20 11:29:26 +00:00
if retentionMsecs <= 0 {
retentionMsecs = maxRetentionMsecs
}
2021-02-15 12:30:12 +00:00
if retentionMsecs > maxRetentionMsecs {
retentionMsecs = maxRetentionMsecs
}
2019-05-22 21:16:55 +00:00
s := & Storage {
2020-10-20 13:10:46 +00:00
path : path ,
cachePath : path + "/cache" ,
retentionMsecs : retentionMsecs ,
2019-05-22 21:16:55 +00:00
stop : make ( chan struct { } ) ,
}
if err := fs . MkdirAllIfNotExist ( path ) ; err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot create a directory for the storage at %q: %w" , path , err )
2019-05-22 21:16:55 +00:00
}
2019-08-12 22:45:22 +00:00
// Protect from concurrent opens.
flockF , err := fs . CreateFlockFile ( path )
2019-05-22 21:16:55 +00:00
if err != nil {
2019-08-12 22:45:22 +00:00
return nil , err
2019-05-22 21:16:55 +00:00
}
s . flockF = flockF
2021-02-10 12:37:14 +00:00
// Pre-create snapshots directory if it is missing.
snapshotsPath := path + "/snapshots"
if err := fs . MkdirAllIfNotExist ( snapshotsPath ) ; err != nil {
return nil , fmt . Errorf ( "cannot create %q: %w" , snapshotsPath , err )
}
2021-05-20 11:15:19 +00:00
// Initialize series cardinality limiter.
if maxHourlySeries > 0 {
s . hourlySeriesLimiter = bloomfilter . NewLimiter ( maxHourlySeries , time . Hour )
}
if maxDailySeries > 0 {
s . dailySeriesLimiter = bloomfilter . NewLimiter ( maxDailySeries , 24 * time . Hour )
}
2019-05-22 21:16:55 +00:00
// Load caches.
mem := memory . Allowed ( )
s . tsidCache = s . mustLoadCache ( "MetricName->TSID" , "metricName_tsid" , mem / 3 )
s . metricIDCache = s . mustLoadCache ( "MetricID->TSID" , "metricID_tsid" , mem / 16 )
s . metricNameCache = s . mustLoadCache ( "MetricID->MetricName" , "metricID_metricName" , mem / 8 )
2019-11-09 21:05:14 +00:00
s . dateMetricIDCache = newDateMetricIDCache ( )
2019-05-22 21:16:55 +00:00
2020-05-14 19:01:51 +00:00
hour := fasttime . UnixHour ( )
2019-06-14 04:52:32 +00:00
hmCurr := s . mustLoadHourMetricIDs ( hour , "curr_hour_metric_ids" )
hmPrev := s . mustLoadHourMetricIDs ( hour - 1 , "prev_hour_metric_ids" )
s . currHourMetricIDs . Store ( hmCurr )
s . prevHourMetricIDs . Store ( hmPrev )
2019-11-08 17:37:16 +00:00
s . pendingHourEntries = & uint64set . Set { }
2019-06-14 04:52:32 +00:00
2020-05-14 19:01:51 +00:00
date := fasttime . UnixDate ( )
2020-05-11 22:06:17 +00:00
nextDayMetricIDs := s . mustLoadNextDayMetricIDs ( date )
s . nextDayMetricIDs . Store ( nextDayMetricIDs )
s . pendingNextDayMetricIDs = & uint64set . Set { }
2020-01-29 23:59:43 +00:00
s . prefetchedMetricIDs . Store ( & uint64set . Set { } )
2021-02-10 15:55:33 +00:00
// Load metadata
metadataDir := path + "/metadata"
isEmptyDB := ! fs . IsPathExist ( path + "/indexdb" )
if err := fs . MkdirAllIfNotExist ( metadataDir ) ; err != nil {
return nil , fmt . Errorf ( "cannot create %q: %w" , metadataDir , err )
}
s . minTimestampForCompositeIndex = mustGetMinTimestampForCompositeIndex ( metadataDir , isEmptyDB )
2019-05-22 21:16:55 +00:00
// Load indexdb
idbPath := path + "/indexdb"
idbSnapshotsPath := idbPath + "/snapshots"
if err := fs . MkdirAllIfNotExist ( idbSnapshotsPath ) ; err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot create %q: %w" , idbSnapshotsPath , err )
2019-05-22 21:16:55 +00:00
}
2021-02-10 12:37:14 +00:00
idbCurr , idbPrev , err := openIndexDBTables ( idbPath , s . metricIDCache , s . metricNameCache , s . tsidCache , s . minTimestampForCompositeIndex )
2019-05-22 21:16:55 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot open indexdb tables at %q: %w" , idbPath , err )
2019-05-22 21:16:55 +00:00
}
idbCurr . SetExtDB ( idbPrev )
s . idbCurr . Store ( idbCurr )
// Load data
tablePath := path + "/data"
2020-10-20 11:29:26 +00:00
tb , err := openTable ( tablePath , s . getDeletedMetricIDs , retentionMsecs )
2019-05-22 21:16:55 +00:00
if err != nil {
s . idb ( ) . MustClose ( )
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot open table at %q: %w" , tablePath , err )
2019-05-22 21:16:55 +00:00
}
s . tb = tb
2019-06-09 16:06:53 +00:00
s . startCurrHourMetricIDsUpdater ( )
2020-05-11 22:06:17 +00:00
s . startNextDayMetricIDsUpdater ( )
2019-05-22 21:16:55 +00:00
s . startRetentionWatcher ( )
return s , nil
}
2020-11-11 12:40:27 +00:00
// DebugFlush flushes recently added storage data, so it becomes visible to search.
func ( s * Storage ) DebugFlush ( ) {
2019-05-22 21:16:55 +00:00
s . tb . flushRawRows ( )
s . idb ( ) . tb . DebugFlush ( )
}
2019-09-24 18:10:22 +00:00
func ( s * Storage ) getDeletedMetricIDs ( ) * uint64set . Set {
2019-05-22 21:16:55 +00:00
return s . idb ( ) . getDeletedMetricIDs ( )
}
// CreateSnapshot creates snapshot for s and returns the snapshot name.
func ( s * Storage ) CreateSnapshot ( ) ( string , error ) {
logger . Infof ( "creating Storage snapshot for %q..." , s . path )
startTime := time . Now ( )
2020-03-24 20:24:54 +00:00
s . snapshotLock . Lock ( )
defer s . snapshotLock . Unlock ( )
2019-05-22 21:16:55 +00:00
snapshotName := fmt . Sprintf ( "%s-%08X" , time . Now ( ) . UTC ( ) . Format ( "20060102150405" ) , nextSnapshotIdx ( ) )
srcDir := s . path
dstDir := fmt . Sprintf ( "%s/snapshots/%s" , srcDir , snapshotName )
if err := fs . MkdirAllFailIfExist ( dstDir ) ; err != nil {
2020-06-30 19:58:18 +00:00
return "" , fmt . Errorf ( "cannot create dir %q: %w" , dstDir , err )
2019-05-22 21:16:55 +00:00
}
dstDataDir := dstDir + "/data"
if err := fs . MkdirAllFailIfExist ( dstDataDir ) ; err != nil {
2020-06-30 19:58:18 +00:00
return "" , fmt . Errorf ( "cannot create dir %q: %w" , dstDataDir , err )
2019-05-22 21:16:55 +00:00
}
smallDir , bigDir , err := s . tb . CreateSnapshot ( snapshotName )
if err != nil {
2020-06-30 19:58:18 +00:00
return "" , fmt . Errorf ( "cannot create table snapshot: %w" , err )
2019-05-22 21:16:55 +00:00
}
dstSmallDir := dstDataDir + "/small"
if err := fs . SymlinkRelative ( smallDir , dstSmallDir ) ; err != nil {
2020-06-30 19:58:18 +00:00
return "" , fmt . Errorf ( "cannot create symlink from %q to %q: %w" , smallDir , dstSmallDir , err )
2019-05-22 21:16:55 +00:00
}
dstBigDir := dstDataDir + "/big"
if err := fs . SymlinkRelative ( bigDir , dstBigDir ) ; err != nil {
2020-06-30 19:58:18 +00:00
return "" , fmt . Errorf ( "cannot create symlink from %q to %q: %w" , bigDir , dstBigDir , err )
2019-05-22 21:16:55 +00:00
}
2019-06-11 20:13:04 +00:00
fs . MustSyncPath ( dstDataDir )
2019-05-22 21:16:55 +00:00
2021-02-10 12:37:14 +00:00
idbSnapshot := fmt . Sprintf ( "%s/indexdb/snapshots/%s" , srcDir , snapshotName )
2019-05-22 21:16:55 +00:00
idb := s . idb ( )
currSnapshot := idbSnapshot + "/" + idb . name
if err := idb . tb . CreateSnapshotAt ( currSnapshot ) ; err != nil {
2020-06-30 19:58:18 +00:00
return "" , fmt . Errorf ( "cannot create curr indexDB snapshot: %w" , err )
2019-05-22 21:16:55 +00:00
}
ok := idb . doExtDB ( func ( extDB * indexDB ) {
prevSnapshot := idbSnapshot + "/" + extDB . name
err = extDB . tb . CreateSnapshotAt ( prevSnapshot )
} )
if ok && err != nil {
2020-06-30 19:58:18 +00:00
return "" , fmt . Errorf ( "cannot create prev indexDB snapshot: %w" , err )
2019-05-22 21:16:55 +00:00
}
dstIdbDir := dstDir + "/indexdb"
if err := fs . SymlinkRelative ( idbSnapshot , dstIdbDir ) ; err != nil {
2020-06-30 19:58:18 +00:00
return "" , fmt . Errorf ( "cannot create symlink from %q to %q: %w" , idbSnapshot , dstIdbDir , err )
2019-05-22 21:16:55 +00:00
}
2021-02-10 12:37:14 +00:00
srcMetadataDir := srcDir + "/metadata"
dstMetadataDir := dstDir + "/metadata"
if err := fs . CopyDirectory ( srcMetadataDir , dstMetadataDir ) ; err != nil {
return "" , fmt . Errorf ( "cannot copy metadata: %s" , err )
}
2019-06-11 20:13:04 +00:00
fs . MustSyncPath ( dstDir )
2019-05-22 21:16:55 +00:00
2020-01-22 16:27:44 +00:00
logger . Infof ( "created Storage snapshot for %q at %q in %.3f seconds" , srcDir , dstDir , time . Since ( startTime ) . Seconds ( ) )
2019-05-22 21:16:55 +00:00
return snapshotName , nil
}
var snapshotNameRegexp = regexp . MustCompile ( "^[0-9]{14}-[0-9A-Fa-f]+$" )
// ListSnapshots returns sorted list of existing snapshots for s.
func ( s * Storage ) ListSnapshots ( ) ( [ ] string , error ) {
snapshotsPath := s . path + "/snapshots"
d , err := os . Open ( snapshotsPath )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot open %q: %w" , snapshotsPath , err )
2019-05-22 21:16:55 +00:00
}
defer fs . MustClose ( d )
fnames , err := d . Readdirnames ( - 1 )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot read contents of %q: %w" , snapshotsPath , err )
2019-05-22 21:16:55 +00:00
}
snapshotNames := make ( [ ] string , 0 , len ( fnames ) )
for _ , fname := range fnames {
if ! snapshotNameRegexp . MatchString ( fname ) {
continue
}
snapshotNames = append ( snapshotNames , fname )
}
sort . Strings ( snapshotNames )
return snapshotNames , nil
}
// DeleteSnapshot deletes the given snapshot.
func ( s * Storage ) DeleteSnapshot ( snapshotName string ) error {
if ! snapshotNameRegexp . MatchString ( snapshotName ) {
return fmt . Errorf ( "invalid snapshotName %q" , snapshotName )
}
snapshotPath := s . path + "/snapshots/" + snapshotName
logger . Infof ( "deleting snapshot %q..." , snapshotPath )
startTime := time . Now ( )
s . tb . MustDeleteSnapshot ( snapshotName )
idbPath := fmt . Sprintf ( "%s/indexdb/snapshots/%s" , s . path , snapshotName )
2019-06-11 22:53:43 +00:00
fs . MustRemoveAll ( idbPath )
fs . MustRemoveAll ( snapshotPath )
2019-05-22 21:16:55 +00:00
2020-01-22 16:27:44 +00:00
logger . Infof ( "deleted snapshot %q in %.3f seconds" , snapshotPath , time . Since ( startTime ) . Seconds ( ) )
2019-05-22 21:16:55 +00:00
return nil
}
var snapshotIdx = uint64 ( time . Now ( ) . UnixNano ( ) )
func nextSnapshotIdx ( ) uint64 {
return atomic . AddUint64 ( & snapshotIdx , 1 )
}
func ( s * Storage ) idb ( ) * indexDB {
return s . idbCurr . Load ( ) . ( * indexDB )
}
// Metrics contains essential metrics for the Storage.
type Metrics struct {
2020-10-09 10:35:48 +00:00
RowsAddedTotal uint64
2020-02-27 21:47:05 +00:00
DedupsDuringMerge uint64
2019-07-26 11:10:25 +00:00
TooSmallTimestampRows uint64
TooBigTimestampRows uint64
2019-08-06 11:09:17 +00:00
AddRowsConcurrencyLimitReached uint64
AddRowsConcurrencyLimitTimeout uint64
AddRowsConcurrencyDroppedRows uint64
AddRowsConcurrencyCapacity uint64
AddRowsConcurrencyCurrent uint64
2020-08-05 15:24:51 +00:00
SearchTSIDsConcurrencyLimitReached uint64
SearchTSIDsConcurrencyLimitTimeout uint64
SearchTSIDsConcurrencyCapacity uint64
SearchTSIDsConcurrencyCurrent uint64
2020-07-05 16:37:38 +00:00
SearchDelays uint64
2020-05-15 10:44:23 +00:00
SlowRowInserts uint64
SlowPerDayIndexInserts uint64
2020-05-15 11:11:39 +00:00
SlowMetricNameLoads uint64
2020-05-15 10:44:23 +00:00
2021-05-20 11:15:19 +00:00
HourlySeriesLimitRowsDropped uint64
DailySeriesLimitRowsDropped uint64
2020-09-09 20:18:32 +00:00
TimestampsBlocksMerged uint64
TimestampsBytesSaved uint64
2019-05-22 21:16:55 +00:00
TSIDCacheSize uint64
2019-07-09 21:47:29 +00:00
TSIDCacheSizeBytes uint64
2019-05-22 21:16:55 +00:00
TSIDCacheRequests uint64
TSIDCacheMisses uint64
TSIDCacheCollisions uint64
MetricIDCacheSize uint64
2019-07-09 21:47:29 +00:00
MetricIDCacheSizeBytes uint64
2019-05-22 21:16:55 +00:00
MetricIDCacheRequests uint64
MetricIDCacheMisses uint64
MetricIDCacheCollisions uint64
MetricNameCacheSize uint64
2019-07-09 21:47:29 +00:00
MetricNameCacheSizeBytes uint64
2019-05-22 21:16:55 +00:00
MetricNameCacheRequests uint64
MetricNameCacheMisses uint64
MetricNameCacheCollisions uint64
2019-11-11 11:21:05 +00:00
DateMetricIDCacheSize uint64
2019-11-13 15:58:05 +00:00
DateMetricIDCacheSizeBytes uint64
2019-11-11 11:21:05 +00:00
DateMetricIDCacheSyncsCount uint64
DateMetricIDCacheResetsCount uint64
2019-05-22 21:16:55 +00:00
2019-11-13 17:00:02 +00:00
HourMetricIDCacheSize uint64
HourMetricIDCacheSizeBytes uint64
2019-06-19 15:36:47 +00:00
2020-05-11 22:06:17 +00:00
NextDayMetricIDCacheSize uint64
NextDayMetricIDCacheSizeBytes uint64
2020-01-29 23:59:43 +00:00
PrefetchedMetricIDsSize uint64
PrefetchedMetricIDsSizeBytes uint64
2019-05-22 21:16:55 +00:00
IndexDBMetrics IndexDBMetrics
TableMetrics TableMetrics
}
// Reset resets m.
func ( m * Metrics ) Reset ( ) {
* m = Metrics { }
}
// UpdateMetrics updates m with metrics from s.
func ( s * Storage ) UpdateMetrics ( m * Metrics ) {
2020-10-09 10:35:48 +00:00
m . RowsAddedTotal = atomic . LoadUint64 ( & rowsAddedTotal )
2020-02-27 21:47:05 +00:00
m . DedupsDuringMerge = atomic . LoadUint64 ( & dedupsDuringMerge )
2019-07-26 11:10:25 +00:00
m . TooSmallTimestampRows += atomic . LoadUint64 ( & s . tooSmallTimestampRows )
m . TooBigTimestampRows += atomic . LoadUint64 ( & s . tooBigTimestampRows )
2019-08-06 11:09:17 +00:00
m . AddRowsConcurrencyLimitReached += atomic . LoadUint64 ( & s . addRowsConcurrencyLimitReached )
m . AddRowsConcurrencyLimitTimeout += atomic . LoadUint64 ( & s . addRowsConcurrencyLimitTimeout )
m . AddRowsConcurrencyDroppedRows += atomic . LoadUint64 ( & s . addRowsConcurrencyDroppedRows )
m . AddRowsConcurrencyCapacity = uint64 ( cap ( addRowsConcurrencyCh ) )
m . AddRowsConcurrencyCurrent = uint64 ( len ( addRowsConcurrencyCh ) )
2020-08-05 15:24:51 +00:00
m . SearchTSIDsConcurrencyLimitReached += atomic . LoadUint64 ( & s . searchTSIDsConcurrencyLimitReached )
m . SearchTSIDsConcurrencyLimitTimeout += atomic . LoadUint64 ( & s . searchTSIDsConcurrencyLimitTimeout )
m . SearchTSIDsConcurrencyCapacity = uint64 ( cap ( searchTSIDsConcurrencyCh ) )
m . SearchTSIDsConcurrencyCurrent = uint64 ( len ( searchTSIDsConcurrencyCh ) )
2020-07-22 21:58:48 +00:00
m . SearchDelays = storagepacelimiter . Search . DelaysTotal ( )
2020-07-05 16:37:38 +00:00
2020-05-15 10:44:23 +00:00
m . SlowRowInserts += atomic . LoadUint64 ( & s . slowRowInserts )
m . SlowPerDayIndexInserts += atomic . LoadUint64 ( & s . slowPerDayIndexInserts )
2020-05-15 11:11:39 +00:00
m . SlowMetricNameLoads += atomic . LoadUint64 ( & s . slowMetricNameLoads )
2020-05-15 10:44:23 +00:00
2021-05-20 11:15:19 +00:00
m . HourlySeriesLimitRowsDropped += atomic . LoadUint64 ( & s . hourlySeriesLimitRowsDropped )
m . DailySeriesLimitRowsDropped += atomic . LoadUint64 ( & s . dailySeriesLimitRowsDropped )
2020-09-09 20:18:32 +00:00
m . TimestampsBlocksMerged = atomic . LoadUint64 ( & timestampsBlocksMerged )
m . TimestampsBytesSaved = atomic . LoadUint64 ( & timestampsBytesSaved )
2019-05-22 21:16:55 +00:00
var cs fastcache . Stats
s . tsidCache . UpdateStats ( & cs )
m . TSIDCacheSize += cs . EntriesCount
2019-07-09 21:47:29 +00:00
m . TSIDCacheSizeBytes += cs . BytesSize
2019-05-22 21:16:55 +00:00
m . TSIDCacheRequests += cs . GetCalls
m . TSIDCacheMisses += cs . Misses
m . TSIDCacheCollisions += cs . Collisions
cs . Reset ( )
s . metricIDCache . UpdateStats ( & cs )
m . MetricIDCacheSize += cs . EntriesCount
2019-07-09 21:47:29 +00:00
m . MetricIDCacheSizeBytes += cs . BytesSize
2019-05-22 21:16:55 +00:00
m . MetricIDCacheRequests += cs . GetCalls
m . MetricIDCacheMisses += cs . Misses
m . MetricIDCacheCollisions += cs . Collisions
cs . Reset ( )
s . metricNameCache . UpdateStats ( & cs )
m . MetricNameCacheSize += cs . EntriesCount
2019-07-09 21:47:29 +00:00
m . MetricNameCacheSizeBytes += cs . BytesSize
2019-05-22 21:16:55 +00:00
m . MetricNameCacheRequests += cs . GetCalls
m . MetricNameCacheMisses += cs . Misses
m . MetricNameCacheCollisions += cs . Collisions
2019-11-09 21:05:14 +00:00
m . DateMetricIDCacheSize += uint64 ( s . dateMetricIDCache . EntriesCount ( ) )
2019-11-13 15:58:05 +00:00
m . DateMetricIDCacheSizeBytes += uint64 ( s . dateMetricIDCache . SizeBytes ( ) )
2019-11-11 11:21:05 +00:00
m . DateMetricIDCacheSyncsCount += atomic . LoadUint64 ( & s . dateMetricIDCache . syncsCount )
m . DateMetricIDCacheResetsCount += atomic . LoadUint64 ( & s . dateMetricIDCache . resetsCount )
2019-05-22 21:16:55 +00:00
2019-06-19 15:36:47 +00:00
hmCurr := s . currHourMetricIDs . Load ( ) . ( * hourMetricIDs )
hmPrev := s . prevHourMetricIDs . Load ( ) . ( * hourMetricIDs )
2019-09-24 18:10:22 +00:00
hourMetricIDsLen := hmPrev . m . Len ( )
if hmCurr . m . Len ( ) > hourMetricIDsLen {
hourMetricIDsLen = hmCurr . m . Len ( )
2019-06-19 15:36:47 +00:00
}
m . HourMetricIDCacheSize += uint64 ( hourMetricIDsLen )
2019-11-13 17:00:02 +00:00
m . HourMetricIDCacheSizeBytes += hmCurr . m . SizeBytes ( )
m . HourMetricIDCacheSizeBytes += hmPrev . m . SizeBytes ( )
2019-06-19 15:36:47 +00:00
2020-05-11 22:06:17 +00:00
nextDayMetricIDs := & s . nextDayMetricIDs . Load ( ) . ( * byDateMetricIDEntry ) . v
m . NextDayMetricIDCacheSize += uint64 ( nextDayMetricIDs . Len ( ) )
m . NextDayMetricIDCacheSizeBytes += nextDayMetricIDs . SizeBytes ( )
2020-01-29 23:59:43 +00:00
prefetchedMetricIDs := s . prefetchedMetricIDs . Load ( ) . ( * uint64set . Set )
m . PrefetchedMetricIDsSize += uint64 ( prefetchedMetricIDs . Len ( ) )
m . PrefetchedMetricIDsSizeBytes += uint64 ( prefetchedMetricIDs . SizeBytes ( ) )
2019-05-22 21:16:55 +00:00
s . idb ( ) . UpdateMetrics ( & m . IndexDBMetrics )
s . tb . UpdateMetrics ( & m . TableMetrics )
}
func ( s * Storage ) startRetentionWatcher ( ) {
s . retentionWatcherWG . Add ( 1 )
go func ( ) {
s . retentionWatcher ( )
s . retentionWatcherWG . Done ( )
} ( )
}
func ( s * Storage ) retentionWatcher ( ) {
for {
2021-02-15 12:30:12 +00:00
d := nextRetentionDuration ( s . retentionMsecs )
2019-05-22 21:16:55 +00:00
select {
case <- s . stop :
return
case <- time . After ( d ) :
s . mustRotateIndexDB ( )
}
}
}
2019-06-09 16:06:53 +00:00
func ( s * Storage ) startCurrHourMetricIDsUpdater ( ) {
s . currHourMetricIDsUpdaterWG . Add ( 1 )
2019-06-02 15:34:08 +00:00
go func ( ) {
2019-06-09 16:06:53 +00:00
s . currHourMetricIDsUpdater ( )
s . currHourMetricIDsUpdaterWG . Done ( )
2019-06-02 15:34:08 +00:00
} ( )
}
2020-05-11 22:06:17 +00:00
func ( s * Storage ) startNextDayMetricIDsUpdater ( ) {
s . nextDayMetricIDsUpdaterWG . Add ( 1 )
go func ( ) {
s . nextDayMetricIDsUpdater ( )
s . nextDayMetricIDsUpdaterWG . Done ( )
} ( )
}
2019-06-09 16:06:53 +00:00
var currHourMetricIDsUpdateInterval = time . Second * 10
2019-06-02 18:58:14 +00:00
2019-06-09 16:06:53 +00:00
func ( s * Storage ) currHourMetricIDsUpdater ( ) {
2020-02-13 10:55:58 +00:00
ticker := time . NewTicker ( currHourMetricIDsUpdateInterval )
defer ticker . Stop ( )
2019-06-02 15:34:08 +00:00
for {
select {
case <- s . stop :
2019-11-08 11:16:40 +00:00
s . updateCurrHourMetricIDs ( )
2019-06-02 15:34:08 +00:00
return
2020-02-13 10:55:58 +00:00
case <- ticker . C :
2019-06-09 16:06:53 +00:00
s . updateCurrHourMetricIDs ( )
2019-06-02 15:34:08 +00:00
}
}
}
2020-05-11 22:06:17 +00:00
var nextDayMetricIDsUpdateInterval = time . Second * 11
func ( s * Storage ) nextDayMetricIDsUpdater ( ) {
ticker := time . NewTicker ( nextDayMetricIDsUpdateInterval )
defer ticker . Stop ( )
for {
select {
case <- s . stop :
s . updateNextDayMetricIDs ( )
return
case <- ticker . C :
s . updateNextDayMetricIDs ( )
}
}
}
2019-05-22 21:16:55 +00:00
func ( s * Storage ) mustRotateIndexDB ( ) {
// Create new indexdb table.
newTableName := nextIndexDBTableName ( )
idbNewPath := s . path + "/indexdb/" + newTableName
2021-02-10 12:37:14 +00:00
idbNew , err := openIndexDB ( idbNewPath , s . metricIDCache , s . metricNameCache , s . tsidCache , s . minTimestampForCompositeIndex )
2019-05-22 21:16:55 +00:00
if err != nil {
logger . Panicf ( "FATAL: cannot create new indexDB at %q: %s" , idbNewPath , err )
}
// Drop extDB
idbCurr := s . idb ( )
idbCurr . doExtDB ( func ( extDB * indexDB ) {
extDB . scheduleToDrop ( )
} )
idbCurr . SetExtDB ( nil )
// Start using idbNew
idbNew . SetExtDB ( idbCurr )
s . idbCurr . Store ( idbNew )
// Persist changes on the file system.
2019-06-11 20:13:04 +00:00
fs . MustSyncPath ( s . path )
2019-05-22 21:16:55 +00:00
// Flush tsidCache, so idbNew can be populated with fresh data.
s . tsidCache . Reset ( )
// Flush dateMetricIDCache, so idbNew can be populated with fresh data.
s . dateMetricIDCache . Reset ( )
// Do not flush metricIDCache and metricNameCache, since all the metricIDs
// from prev idb remain valid after the rotation.
2020-05-14 20:45:04 +00:00
// There is no need in resetting nextDayMetricIDs, since it should be automatically reset every day.
2019-05-22 21:16:55 +00:00
}
// MustClose closes the storage.
2021-02-17 12:59:04 +00:00
//
// It is expected that the s is no longer used during the close.
2019-05-22 21:16:55 +00:00
func ( s * Storage ) MustClose ( ) {
close ( s . stop )
s . retentionWatcherWG . Wait ( )
2019-06-09 16:06:53 +00:00
s . currHourMetricIDsUpdaterWG . Wait ( )
2020-05-11 22:06:17 +00:00
s . nextDayMetricIDsUpdaterWG . Wait ( )
2019-05-22 21:16:55 +00:00
s . tb . MustClose ( )
s . idb ( ) . MustClose ( )
// Save caches.
2019-08-13 18:35:19 +00:00
s . mustSaveAndStopCache ( s . tsidCache , "MetricName->TSID" , "metricName_tsid" )
s . mustSaveAndStopCache ( s . metricIDCache , "MetricID->TSID" , "metricID_tsid" )
s . mustSaveAndStopCache ( s . metricNameCache , "MetricID->MetricName" , "metricID_metricName" )
2019-05-22 21:16:55 +00:00
2019-06-14 04:52:32 +00:00
hmCurr := s . currHourMetricIDs . Load ( ) . ( * hourMetricIDs )
s . mustSaveHourMetricIDs ( hmCurr , "curr_hour_metric_ids" )
hmPrev := s . prevHourMetricIDs . Load ( ) . ( * hourMetricIDs )
s . mustSaveHourMetricIDs ( hmPrev , "prev_hour_metric_ids" )
2020-05-11 22:06:17 +00:00
nextDayMetricIDs := s . nextDayMetricIDs . Load ( ) . ( * byDateMetricIDEntry )
s . mustSaveNextDayMetricIDs ( nextDayMetricIDs )
2019-05-22 21:16:55 +00:00
// Release lock file.
if err := s . flockF . Close ( ) ; err != nil {
logger . Panicf ( "FATAL: cannot close lock file %q: %s" , s . flockF . Name ( ) , err )
}
}
2020-05-11 22:06:17 +00:00
func ( s * Storage ) mustLoadNextDayMetricIDs ( date uint64 ) * byDateMetricIDEntry {
e := & byDateMetricIDEntry {
date : date ,
}
name := "next_day_metric_ids"
path := s . cachePath + "/" + name
logger . Infof ( "loading %s from %q..." , name , path )
startTime := time . Now ( )
if ! fs . IsPathExist ( path ) {
logger . Infof ( "nothing to load from %q" , path )
return e
}
src , err := ioutil . ReadFile ( path )
if err != nil {
logger . Panicf ( "FATAL: cannot read %s: %s" , path , err )
}
srcOrigLen := len ( src )
if len ( src ) < 16 {
logger . Errorf ( "discarding %s, since it has broken header; got %d bytes; want %d bytes" , path , len ( src ) , 16 )
return e
}
// Unmarshal header
dateLoaded := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
if dateLoaded != date {
logger . Infof ( "discarding %s, since it contains data for stale date; got %d; want %d" , path , dateLoaded , date )
return e
}
// Unmarshal uint64set
m , tail , err := unmarshalUint64Set ( src )
if err != nil {
logger . Infof ( "discarding %s because cannot load uint64set: %s" , path , err )
return e
}
if len ( tail ) > 0 {
logger . Infof ( "discarding %s because non-empty tail left; len(tail)=%d" , path , len ( tail ) )
return e
}
e . v = * m
logger . Infof ( "loaded %s from %q in %.3f seconds; entriesCount: %d; sizeBytes: %d" , name , path , time . Since ( startTime ) . Seconds ( ) , m . Len ( ) , srcOrigLen )
return e
}
2019-06-14 04:52:32 +00:00
func ( s * Storage ) mustLoadHourMetricIDs ( hour uint64 , name string ) * hourMetricIDs {
2020-05-11 22:06:17 +00:00
hm := & hourMetricIDs {
hour : hour ,
}
2019-06-14 04:52:32 +00:00
path := s . cachePath + "/" + name
logger . Infof ( "loading %s from %q..." , name , path )
startTime := time . Now ( )
if ! fs . IsPathExist ( path ) {
logger . Infof ( "nothing to load from %q" , path )
2020-05-11 22:06:17 +00:00
return hm
2019-06-14 04:52:32 +00:00
}
src , err := ioutil . ReadFile ( path )
if err != nil {
logger . Panicf ( "FATAL: cannot read %s: %s" , path , err )
}
srcOrigLen := len ( src )
if len ( src ) < 24 {
logger . Errorf ( "discarding %s, since it has broken header; got %d bytes; want %d bytes" , path , len ( src ) , 24 )
2020-05-11 22:06:17 +00:00
return hm
2019-06-14 04:52:32 +00:00
}
2019-11-08 17:37:16 +00:00
// Unmarshal header
2019-06-14 04:52:32 +00:00
isFull := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
hourLoaded := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
if hourLoaded != hour {
2020-05-11 22:06:17 +00:00
logger . Infof ( "discarding %s, since it contains outdated hour; got %d; want %d" , path , hourLoaded , hour )
return hm
2019-06-14 04:52:32 +00:00
}
2019-11-08 17:37:16 +00:00
2020-05-11 22:06:17 +00:00
// Unmarshal uint64set
m , tail , err := unmarshalUint64Set ( src )
if err != nil {
logger . Infof ( "discarding %s because cannot load uint64set: %s" , path , err )
return hm
2019-06-14 04:52:32 +00:00
}
2020-05-11 22:06:17 +00:00
if len ( tail ) > 0 {
logger . Infof ( "discarding %s because non-empty tail left; len(tail)=%d" , path , len ( tail ) )
return hm
2019-06-14 04:52:32 +00:00
}
2020-05-11 22:06:17 +00:00
hm . m = m
hm . isFull = isFull != 0
logger . Infof ( "loaded %s from %q in %.3f seconds; entriesCount: %d; sizeBytes: %d" , name , path , time . Since ( startTime ) . Seconds ( ) , m . Len ( ) , srcOrigLen )
return hm
}
2019-11-13 11:11:02 +00:00
2020-05-11 22:06:17 +00:00
func ( s * Storage ) mustSaveNextDayMetricIDs ( e * byDateMetricIDEntry ) {
name := "next_day_metric_ids"
path := s . cachePath + "/" + name
logger . Infof ( "saving %s to %q..." , name , path )
startTime := time . Now ( )
dst := make ( [ ] byte , 0 , e . v . Len ( ) * 8 + 16 )
// Marshal header
dst = encoding . MarshalUint64 ( dst , e . date )
// Marshal e.v
dst = marshalUint64Set ( dst , & e . v )
if err := ioutil . WriteFile ( path , dst , 0644 ) ; err != nil {
logger . Panicf ( "FATAL: cannot write %d bytes to %q: %s" , len ( dst ) , path , err )
2019-06-14 04:52:32 +00:00
}
2020-05-11 22:06:17 +00:00
logger . Infof ( "saved %s to %q in %.3f seconds; entriesCount: %d; sizeBytes: %d" , name , path , time . Since ( startTime ) . Seconds ( ) , e . v . Len ( ) , len ( dst ) )
2019-06-14 04:52:32 +00:00
}
func ( s * Storage ) mustSaveHourMetricIDs ( hm * hourMetricIDs , name string ) {
path := s . cachePath + "/" + name
logger . Infof ( "saving %s to %q..." , name , path )
startTime := time . Now ( )
2019-09-24 18:10:22 +00:00
dst := make ( [ ] byte , 0 , hm . m . Len ( ) * 8 + 24 )
2019-06-14 04:52:32 +00:00
isFull := uint64 ( 0 )
if hm . isFull {
isFull = 1
}
2019-11-08 17:37:16 +00:00
// Marshal header
2019-06-14 04:52:32 +00:00
dst = encoding . MarshalUint64 ( dst , isFull )
dst = encoding . MarshalUint64 ( dst , hm . hour )
2019-11-08 17:37:16 +00:00
// Marshal hm.m
2020-05-11 22:06:17 +00:00
dst = marshalUint64Set ( dst , hm . m )
2019-11-13 11:11:02 +00:00
2019-06-14 04:52:32 +00:00
if err := ioutil . WriteFile ( path , dst , 0644 ) ; err != nil {
logger . Panicf ( "FATAL: cannot write %d bytes to %q: %s" , len ( dst ) , path , err )
}
2020-01-22 16:27:44 +00:00
logger . Infof ( "saved %s to %q in %.3f seconds; entriesCount: %d; sizeBytes: %d" , name , path , time . Since ( startTime ) . Seconds ( ) , hm . m . Len ( ) , len ( dst ) )
2019-06-14 04:52:32 +00:00
}
2020-05-11 22:06:17 +00:00
func unmarshalUint64Set ( src [ ] byte ) ( * uint64set . Set , [ ] byte , error ) {
mLen := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
if uint64 ( len ( src ) ) < 8 * mLen {
return nil , nil , fmt . Errorf ( "cannot unmarshal uint64set; got %d bytes; want at least %d bytes" , len ( src ) , 8 * mLen )
}
m := & uint64set . Set { }
for i := uint64 ( 0 ) ; i < mLen ; i ++ {
metricID := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
m . Add ( metricID )
}
return m , src , nil
}
func marshalUint64Set ( dst [ ] byte , m * uint64set . Set ) [ ] byte {
dst = encoding . MarshalUint64 ( dst , uint64 ( m . Len ( ) ) )
m . ForEach ( func ( part [ ] uint64 ) bool {
for _ , metricID := range part {
dst = encoding . MarshalUint64 ( dst , metricID )
}
return true
} )
return dst
}
2021-02-10 12:37:14 +00:00
func mustGetMinTimestampForCompositeIndex ( metadataDir string , isEmptyDB bool ) int64 {
path := metadataDir + "/minTimestampForCompositeIndex"
minTimestamp , err := loadMinTimestampForCompositeIndex ( path )
if err == nil {
return minTimestamp
}
2021-02-10 13:56:07 +00:00
if ! os . IsNotExist ( err ) {
logger . Errorf ( "cannot read minTimestampForCompositeIndex, so trying to re-create it; error: %s" , err )
}
2021-02-10 12:37:14 +00:00
date := time . Now ( ) . UnixNano ( ) / 1e6 / msecPerDay
if ! isEmptyDB {
// The current and the next day can already contain non-composite indexes,
// so they cannot be queried with composite indexes.
date += 2
} else {
date = 0
}
minTimestamp = date * msecPerDay
dateBuf := encoding . MarshalInt64 ( nil , minTimestamp )
if err := os . RemoveAll ( path ) ; err != nil {
logger . Fatalf ( "cannot remove a file with minTimestampForCompositeIndex: %s" , err )
}
if err := fs . WriteFileAtomically ( path , dateBuf ) ; err != nil {
logger . Fatalf ( "cannot store minTimestampForCompositeIndex: %s" , err )
}
return minTimestamp
}
func loadMinTimestampForCompositeIndex ( path string ) ( int64 , error ) {
data , err := ioutil . ReadFile ( path )
if err != nil {
return 0 , err
}
if len ( data ) != 8 {
return 0 , fmt . Errorf ( "unexpected length of %q; got %d bytes; want 8 bytes" , path , len ( data ) )
}
return encoding . UnmarshalInt64 ( data ) , nil
}
2019-08-13 18:35:19 +00:00
func ( s * Storage ) mustLoadCache ( info , name string , sizeBytes int ) * workingsetcache . Cache {
2019-05-22 21:16:55 +00:00
path := s . cachePath + "/" + name
logger . Infof ( "loading %s cache from %q..." , info , path )
startTime := time . Now ( )
2019-08-13 18:35:19 +00:00
c := workingsetcache . Load ( path , sizeBytes , time . Hour )
2019-05-22 21:16:55 +00:00
var cs fastcache . Stats
c . UpdateStats ( & cs )
2020-01-22 16:27:44 +00:00
logger . Infof ( "loaded %s cache from %q in %.3f seconds; entriesCount: %d; sizeBytes: %d" ,
info , path , time . Since ( startTime ) . Seconds ( ) , cs . EntriesCount , cs . BytesSize )
2019-05-22 21:16:55 +00:00
return c
}
2019-08-13 18:35:19 +00:00
func ( s * Storage ) mustSaveAndStopCache ( c * workingsetcache . Cache , info , name string ) {
2019-05-22 21:16:55 +00:00
path := s . cachePath + "/" + name
logger . Infof ( "saving %s cache to %q..." , info , path )
startTime := time . Now ( )
2019-08-13 18:35:19 +00:00
if err := c . Save ( path ) ; err != nil {
2019-05-22 21:16:55 +00:00
logger . Panicf ( "FATAL: cannot save %s cache to %q: %s" , info , path , err )
}
var cs fastcache . Stats
c . UpdateStats ( & cs )
2019-08-13 18:35:19 +00:00
c . Stop ( )
2020-01-22 16:27:44 +00:00
logger . Infof ( "saved %s cache to %q in %.3f seconds; entriesCount: %d; sizeBytes: %d" ,
info , path , time . Since ( startTime ) . Seconds ( ) , cs . EntriesCount , cs . BytesSize )
2019-05-22 21:16:55 +00:00
}
2021-02-15 12:30:12 +00:00
func nextRetentionDuration ( retentionMsecs int64 ) time . Duration {
// Round retentionMsecs to days. This guarantees that per-day inverted index works as expected.
2021-02-15 12:32:57 +00:00
retentionMsecs = ( ( retentionMsecs + msecPerDay - 1 ) / msecPerDay ) * msecPerDay
2021-02-15 12:30:12 +00:00
t := time . Now ( ) . UnixNano ( ) / 1e6
2021-02-15 12:32:57 +00:00
deadline := ( ( t + retentionMsecs - 1 ) / retentionMsecs ) * retentionMsecs
2019-12-02 12:42:26 +00:00
// Schedule the deadline to +4 hours from the next retention period start.
// This should prevent from possible double deletion of indexdb
// due to time drift - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/248 .
2021-02-15 12:30:12 +00:00
deadline += 4 * 3600 * 1000
return time . Duration ( deadline - t ) * time . Millisecond
2019-05-22 21:16:55 +00:00
}
2020-11-16 08:55:55 +00:00
// SearchMetricNames returns metric names matching the given tfss on the given tr.
func ( s * Storage ) SearchMetricNames ( tfss [ ] * TagFilters , tr TimeRange , maxMetrics int , deadline uint64 ) ( [ ] MetricName , error ) {
tsids , err := s . searchTSIDs ( tfss , tr , maxMetrics , deadline )
if err != nil {
return nil , err
}
if err = s . prefetchMetricNames ( tsids , deadline ) ; err != nil {
return nil , err
}
idb := s . idb ( )
mns := make ( [ ] MetricName , 0 , len ( tsids ) )
var metricName [ ] byte
for i := range tsids {
2021-03-22 21:02:37 +00:00
if i & paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace ( deadline ) ; err != nil {
return nil , err
}
}
2020-11-16 08:55:55 +00:00
metricID := tsids [ i ] . MetricID
var err error
2021-03-22 20:41:47 +00:00
metricName , err = idb . searchMetricNameWithCache ( metricName [ : 0 ] , metricID )
2020-11-16 08:55:55 +00:00
if err != nil {
if err == io . EOF {
// Skip missing metricName for metricID.
// It should be automatically fixed. See indexDB.searchMetricName for details.
continue
}
return nil , fmt . Errorf ( "error when searching metricName for metricID=%d: %w" , metricID , err )
}
mns = mns [ : len ( mns ) + 1 ]
mn := & mns [ len ( mns ) - 1 ]
if err = mn . Unmarshal ( metricName ) ; err != nil {
return nil , fmt . Errorf ( "cannot unmarshal metricName=%q: %w" , metricName , err )
}
}
return mns , nil
}
2019-09-23 19:34:04 +00:00
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
2020-07-23 17:42:57 +00:00
func ( s * Storage ) searchTSIDs ( tfss [ ] * TagFilters , tr TimeRange , maxMetrics int , deadline uint64 ) ( [ ] TSID , error ) {
2019-05-22 21:16:55 +00:00
// Do not cache tfss -> tsids here, since the caching is performed
// on idb level.
2020-08-05 15:24:51 +00:00
// Limit the number of concurrent goroutines that may search TSIDS in the storage.
// This should prevent from out of memory errors and CPU trashing when too many
// goroutines call searchTSIDs.
select {
case searchTSIDsConcurrencyCh <- struct { } { } :
default :
// Sleep for a while until giving up
atomic . AddUint64 ( & s . searchTSIDsConcurrencyLimitReached , 1 )
currentTime := fasttime . UnixTimestamp ( )
timeoutSecs := uint64 ( 0 )
if currentTime < deadline {
timeoutSecs = deadline - currentTime
}
timeout := time . Second * time . Duration ( timeoutSecs )
t := timerpool . Get ( timeout )
select {
case searchTSIDsConcurrencyCh <- struct { } { } :
timerpool . Put ( t )
case <- t . C :
timerpool . Put ( t )
atomic . AddUint64 ( & s . searchTSIDsConcurrencyLimitTimeout , 1 )
return nil , fmt . Errorf ( "cannot search for tsids, since more than %d concurrent searches are performed during %.3f secs; add more CPUs or reduce query load" ,
cap ( searchTSIDsConcurrencyCh ) , timeout . Seconds ( ) )
}
}
2020-07-23 17:42:57 +00:00
tsids , err := s . idb ( ) . searchTSIDs ( tfss , tr , maxMetrics , deadline )
2020-08-05 15:24:51 +00:00
<- searchTSIDsConcurrencyCh
2019-05-22 21:16:55 +00:00
if err != nil {
2020-08-10 10:36:00 +00:00
return nil , fmt . Errorf ( "error when searching tsids: %w" , err )
2019-05-22 21:16:55 +00:00
}
return tsids , nil
}
2020-08-05 15:24:51 +00:00
var (
// Limit the concurrency for TSID searches to GOMAXPROCS*2, since this operation
// is CPU bound and sometimes disk IO bound, so there is no sense in running more
// than GOMAXPROCS*2 concurrent goroutines for TSID searches.
2020-12-08 18:49:32 +00:00
searchTSIDsConcurrencyCh = make ( chan struct { } , cgroup . AvailableCPUs ( ) * 2 )
2020-08-05 15:24:51 +00:00
)
2020-01-29 23:59:43 +00:00
// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache.
//
2021-03-22 20:41:47 +00:00
// This should speed-up further searchMetricNameWithCache calls for metricIDs from tsids.
2020-07-23 17:42:57 +00:00
func ( s * Storage ) prefetchMetricNames ( tsids [ ] TSID , deadline uint64 ) error {
2020-07-23 16:21:49 +00:00
if len ( tsids ) == 0 {
return nil
}
2020-01-29 23:59:43 +00:00
var metricIDs uint64Sorter
prefetchedMetricIDs := s . prefetchedMetricIDs . Load ( ) . ( * uint64set . Set )
for i := range tsids {
2020-07-23 16:21:49 +00:00
tsid := & tsids [ i ]
metricID := tsid . MetricID
2020-01-29 23:59:43 +00:00
if prefetchedMetricIDs . Has ( metricID ) {
continue
}
metricIDs = append ( metricIDs , metricID )
}
if len ( metricIDs ) < 500 {
// It is cheaper to skip pre-fetching and obtain metricNames inline.
return nil
}
2020-05-16 07:21:17 +00:00
atomic . AddUint64 ( & s . slowMetricNameLoads , uint64 ( len ( metricIDs ) ) )
2020-01-29 23:59:43 +00:00
// Pre-fetch metricIDs.
sort . Sort ( metricIDs )
2021-03-22 20:41:47 +00:00
var missingMetricIDs [ ] uint64
2020-01-29 23:59:43 +00:00
var metricName [ ] byte
var err error
2020-01-30 22:54:28 +00:00
idb := s . idb ( )
2020-07-23 17:42:57 +00:00
is := idb . getIndexSearch ( deadline )
2020-01-30 22:54:28 +00:00
defer idb . putIndexSearch ( is )
2020-07-23 16:21:49 +00:00
for loops , metricID := range metricIDs {
2020-08-07 05:37:33 +00:00
if loops & paceLimiterSlowIterationsMask == 0 {
2020-07-23 17:42:57 +00:00
if err := checkSearchDeadlineAndPace ( is . deadline ) ; err != nil {
return err
}
2020-07-23 16:21:49 +00:00
}
2021-03-22 20:41:47 +00:00
metricName , err = is . searchMetricNameWithCache ( metricName [ : 0 ] , metricID )
if err != nil {
if err == io . EOF {
missingMetricIDs = append ( missingMetricIDs , metricID )
continue
}
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "error in pre-fetching metricName for metricID=%d: %w" , metricID , err )
2020-01-29 23:59:43 +00:00
}
}
2021-03-22 20:41:47 +00:00
idb . doExtDB ( func ( extDB * indexDB ) {
is := extDB . getIndexSearch ( deadline )
defer extDB . putIndexSearch ( is )
for loops , metricID := range missingMetricIDs {
if loops & paceLimiterSlowIterationsMask == 0 {
if err = checkSearchDeadlineAndPace ( is . deadline ) ; err != nil {
return
}
}
metricName , err = is . searchMetricNameWithCache ( metricName [ : 0 ] , metricID )
if err != nil && err != io . EOF {
err = fmt . Errorf ( "error in pre-fetching metricName for metricID=%d in extDB: %w" , metricID , err )
return
}
}
} )
if err != nil {
return err
}
2020-01-29 23:59:43 +00:00
// Store the pre-fetched metricIDs, so they aren't pre-fetched next time.
2020-08-06 13:48:21 +00:00
2020-01-29 23:59:43 +00:00
prefetchedMetricIDsNew := prefetchedMetricIDs . Clone ( )
2020-07-21 17:56:49 +00:00
prefetchedMetricIDsNew . AddMulti ( metricIDs )
2020-08-06 13:48:21 +00:00
if prefetchedMetricIDsNew . SizeBytes ( ) > uint64 ( memory . Allowed ( ) ) / 32 {
// Reset prefetchedMetricIDsNew if it occupies too much space.
prefetchedMetricIDsNew = & uint64set . Set { }
}
2020-01-29 23:59:43 +00:00
s . prefetchedMetricIDs . Store ( prefetchedMetricIDsNew )
return nil
}
2020-08-10 10:17:12 +00:00
// ErrDeadlineExceeded is returned when the request times out.
var ErrDeadlineExceeded = fmt . Errorf ( "deadline exceeded" )
2020-07-23 17:42:57 +00:00
2019-05-22 21:16:55 +00:00
// DeleteMetrics deletes all the metrics matching the given tfss.
//
// Returns the number of metrics deleted.
func ( s * Storage ) DeleteMetrics ( tfss [ ] * TagFilters ) ( int , error ) {
deletedCount , err := s . idb ( ) . DeleteTSIDs ( tfss )
if err != nil {
2020-06-30 19:58:18 +00:00
return deletedCount , fmt . Errorf ( "cannot delete tsids: %w" , err )
2019-05-22 21:16:55 +00:00
}
2020-07-14 11:02:14 +00:00
// Do not reset MetricName->TSID cache in order to prevent from adding new data points
// to deleted time series in Storage.add, since it is already reset inside DeleteTSIDs.
2020-07-06 18:56:14 +00:00
2020-07-14 11:02:14 +00:00
// Do not reset MetricID->MetricName cache, since it must be used only
2019-05-22 21:16:55 +00:00
// after filtering out deleted metricIDs.
2020-07-06 18:56:14 +00:00
2019-05-22 21:16:55 +00:00
return deletedCount , nil
}
2020-11-04 22:15:43 +00:00
// SearchTagKeysOnTimeRange searches for tag keys on tr.
func ( s * Storage ) SearchTagKeysOnTimeRange ( tr TimeRange , maxTagKeys int , deadline uint64 ) ( [ ] string , error ) {
return s . idb ( ) . SearchTagKeysOnTimeRange ( tr , maxTagKeys , deadline )
}
2019-05-22 21:16:55 +00:00
// SearchTagKeys searches for tag keys
2020-07-23 17:42:57 +00:00
func ( s * Storage ) SearchTagKeys ( maxTagKeys int , deadline uint64 ) ( [ ] string , error ) {
return s . idb ( ) . SearchTagKeys ( maxTagKeys , deadline )
2019-05-22 21:16:55 +00:00
}
2020-11-04 22:15:43 +00:00
// SearchTagValuesOnTimeRange searches for tag values for the given tagKey on tr.
func ( s * Storage ) SearchTagValuesOnTimeRange ( tagKey [ ] byte , tr TimeRange , maxTagValues int , deadline uint64 ) ( [ ] string , error ) {
return s . idb ( ) . SearchTagValuesOnTimeRange ( tagKey , tr , maxTagValues , deadline )
}
2019-05-22 21:16:55 +00:00
// SearchTagValues searches for tag values for the given tagKey
2020-07-23 17:42:57 +00:00
func ( s * Storage ) SearchTagValues ( tagKey [ ] byte , maxTagValues int , deadline uint64 ) ( [ ] string , error ) {
return s . idb ( ) . SearchTagValues ( tagKey , maxTagValues , deadline )
2019-05-22 21:16:55 +00:00
}
2020-09-10 21:28:19 +00:00
// SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr.
//
// This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs.
2021-02-02 22:24:05 +00:00
//
// If more than maxTagValueSuffixes suffixes is found, then only the first maxTagValueSuffixes suffixes is returned.
2020-09-10 21:28:19 +00:00
func ( s * Storage ) SearchTagValueSuffixes ( tr TimeRange , tagKey , tagValuePrefix [ ] byte , delimiter byte , maxTagValueSuffixes int , deadline uint64 ) ( [ ] string , error ) {
return s . idb ( ) . SearchTagValueSuffixes ( tr , tagKey , tagValuePrefix , delimiter , maxTagValueSuffixes , deadline )
}
2021-02-02 22:24:05 +00:00
// SearchGraphitePaths returns all the matching paths for the given graphite query on the given tr.
func ( s * Storage ) SearchGraphitePaths ( tr TimeRange , query [ ] byte , maxPaths int , deadline uint64 ) ( [ ] string , error ) {
2021-03-18 12:52:49 +00:00
return s . searchGraphitePaths ( tr , nil , query , maxPaths , deadline )
}
func ( s * Storage ) searchGraphitePaths ( tr TimeRange , qHead , qTail [ ] byte , maxPaths int , deadline uint64 ) ( [ ] string , error ) {
2021-03-18 13:21:13 +00:00
n := bytes . IndexAny ( qTail , "*[{" )
2021-02-02 22:24:05 +00:00
if n < 0 {
2021-03-18 12:52:49 +00:00
// Verify that qHead matches a metric name.
qHead = append ( qHead , qTail ... )
suffixes , err := s . SearchTagValueSuffixes ( tr , nil , qHead , '.' , 1 , deadline )
2021-02-02 22:24:05 +00:00
if err != nil {
return nil , err
}
if len ( suffixes ) == 0 {
// The query doesn't match anything.
return nil , nil
}
if len ( suffixes [ 0 ] ) > 0 {
// The query matches a metric name with additional suffix.
return nil , nil
}
2021-03-18 12:52:49 +00:00
return [ ] string { string ( qHead ) } , nil
2021-02-02 22:24:05 +00:00
}
2021-03-18 12:52:49 +00:00
qHead = append ( qHead , qTail [ : n ] ... )
suffixes , err := s . SearchTagValueSuffixes ( tr , nil , qHead , '.' , maxPaths , deadline )
2021-02-02 22:24:05 +00:00
if err != nil {
return nil , err
}
if len ( suffixes ) == 0 {
return nil , nil
}
if len ( suffixes ) >= maxPaths {
return nil , fmt . Errorf ( "more than maxPaths=%d suffixes found" , maxPaths )
}
2021-03-18 12:52:49 +00:00
qNode := qTail [ n : ]
qTail = nil
2021-02-02 22:24:05 +00:00
mustMatchLeafs := true
2021-03-18 12:52:49 +00:00
if m := bytes . IndexByte ( qNode , '.' ) ; m >= 0 {
2021-02-02 22:24:05 +00:00
qTail = qNode [ m + 1 : ]
2021-02-03 16:45:42 +00:00
qNode = qNode [ : m + 1 ]
2021-02-02 22:24:05 +00:00
mustMatchLeafs = false
}
2021-03-18 12:52:49 +00:00
re , err := getRegexpForGraphiteQuery ( string ( qNode ) )
2021-02-02 22:24:05 +00:00
if err != nil {
return nil , err
}
2021-03-18 12:52:49 +00:00
qHeadLen := len ( qHead )
2021-02-02 22:24:05 +00:00
var paths [ ] string
for _ , suffix := range suffixes {
if len ( paths ) > maxPaths {
2021-03-18 12:52:49 +00:00
return nil , fmt . Errorf ( "more than maxPath=%d paths found" , maxPaths )
2021-02-02 22:24:05 +00:00
}
if ! re . MatchString ( suffix ) {
continue
}
if mustMatchLeafs {
2021-03-18 12:52:49 +00:00
qHead = append ( qHead [ : qHeadLen ] , suffix ... )
paths = append ( paths , string ( qHead ) )
2021-02-02 22:24:05 +00:00
continue
}
2021-03-18 12:52:49 +00:00
qHead = append ( qHead [ : qHeadLen ] , suffix ... )
ps , err := s . searchGraphitePaths ( tr , qHead , qTail , maxPaths , deadline )
2021-02-02 22:24:05 +00:00
if err != nil {
return nil , err
}
paths = append ( paths , ps ... )
}
return paths , nil
}
2021-02-03 18:12:17 +00:00
func getRegexpForGraphiteQuery ( q string ) ( * regexp . Regexp , error ) {
parts , tail := getRegexpPartsForGraphiteQuery ( q )
if len ( tail ) > 0 {
return nil , fmt . Errorf ( "unexpected tail left after parsing %q: %q" , q , tail )
}
2021-02-02 22:24:05 +00:00
reStr := "^" + strings . Join ( parts , "" ) + "$"
return regexp . Compile ( reStr )
}
2021-02-03 18:12:17 +00:00
func getRegexpPartsForGraphiteQuery ( q string ) ( [ ] string , string ) {
2021-02-02 22:24:05 +00:00
var parts [ ] string
for {
2021-02-03 18:12:17 +00:00
n := strings . IndexAny ( q , "*{}[," )
2021-02-02 22:24:05 +00:00
if n < 0 {
2021-02-03 18:12:17 +00:00
parts = append ( parts , regexp . QuoteMeta ( q ) )
return parts , ""
2021-02-02 22:24:05 +00:00
}
parts = append ( parts , regexp . QuoteMeta ( q [ : n ] ) )
q = q [ n : ]
switch q [ 0 ] {
2021-02-03 18:12:17 +00:00
case ',' , '}' :
return parts , q
2021-02-02 22:24:05 +00:00
case '*' :
parts = append ( parts , "[^.]*" )
q = q [ 1 : ]
case '{' :
var tmp [ ] string
2021-02-03 18:12:17 +00:00
for {
a , tail := getRegexpPartsForGraphiteQuery ( q [ 1 : ] )
tmp = append ( tmp , strings . Join ( a , "" ) )
if len ( tail ) == 0 {
parts = append ( parts , regexp . QuoteMeta ( "{" ) )
parts = append ( parts , strings . Join ( tmp , "," ) )
return parts , ""
}
if tail [ 0 ] == ',' {
q = tail
continue
}
if tail [ 0 ] == '}' {
if len ( tmp ) == 1 {
parts = append ( parts , tmp [ 0 ] )
} else {
parts = append ( parts , "(?:" + strings . Join ( tmp , "|" ) + ")" )
}
q = tail [ 1 : ]
break
}
logger . Panicf ( "BUG: unexpected first char at tail %q; want `.` or `}`" , tail )
2021-02-02 22:24:05 +00:00
}
case '[' :
n := strings . IndexByte ( q , ']' )
if n < 0 {
2021-02-03 18:12:17 +00:00
parts = append ( parts , regexp . QuoteMeta ( q ) )
return parts , ""
2021-02-02 22:24:05 +00:00
}
parts = append ( parts , q [ : n + 1 ] )
q = q [ n + 1 : ]
}
}
}
2019-11-21 21:18:22 +00:00
// SearchTagEntries returns a list of (tagName -> tagValues)
2020-07-23 17:42:57 +00:00
func ( s * Storage ) SearchTagEntries ( maxTagKeys , maxTagValues int , deadline uint64 ) ( [ ] TagEntry , error ) {
2019-06-10 15:55:20 +00:00
idb := s . idb ( )
2020-07-23 17:42:57 +00:00
keys , err := idb . SearchTagKeys ( maxTagKeys , deadline )
2019-06-10 15:55:20 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot search tag keys: %w" , err )
2019-06-10 15:55:20 +00:00
}
// Sort keys for faster seeks below
sort . Strings ( keys )
tes := make ( [ ] TagEntry , len ( keys ) )
for i , key := range keys {
2020-07-23 17:42:57 +00:00
values , err := idb . SearchTagValues ( [ ] byte ( key ) , maxTagValues , deadline )
2019-06-10 15:55:20 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot search values for tag %q: %w" , key , err )
2019-06-10 15:55:20 +00:00
}
te := & tes [ i ]
te . Key = key
te . Values = values
}
return tes , nil
}
// TagEntry contains (tagName -> tagValues) mapping
type TagEntry struct {
// Key is tagName
Key string
// Values contains all the values for Key.
Values [ ] string
}
2019-05-22 21:16:55 +00:00
// GetSeriesCount returns the approximate number of unique time series.
//
// It includes the deleted series too and may count the same series
// up to two times - in db and extDB.
2020-07-23 17:42:57 +00:00
func ( s * Storage ) GetSeriesCount ( deadline uint64 ) ( uint64 , error ) {
return s . idb ( ) . GetSeriesCount ( deadline )
2019-05-22 21:16:55 +00:00
}
2021-05-12 13:32:48 +00:00
// GetTSDBStatusWithFiltersForDate returns TSDB status data for /api/v1/status/tsdb with match[] filters.
func ( s * Storage ) GetTSDBStatusWithFiltersForDate ( tfss [ ] * TagFilters , date uint64 , topN int , deadline uint64 ) ( * TSDBStatus , error ) {
return s . idb ( ) . GetTSDBStatusWithFiltersForDate ( tfss , date , topN , deadline )
2021-05-12 12:18:45 +00:00
}
2019-05-22 21:16:55 +00:00
// MetricRow is a metric to insert into storage.
type MetricRow struct {
// MetricNameRaw contains raw metric name, which must be decoded
2021-05-08 14:55:44 +00:00
// with MetricName.UnmarshalRaw.
2019-05-22 21:16:55 +00:00
MetricNameRaw [ ] byte
Timestamp int64
Value float64
}
// CopyFrom copies src to mr.
func ( mr * MetricRow ) CopyFrom ( src * MetricRow ) {
mr . MetricNameRaw = append ( mr . MetricNameRaw [ : 0 ] , src . MetricNameRaw ... )
mr . Timestamp = src . Timestamp
mr . Value = src . Value
}
// String returns string representation of the mr.
func ( mr * MetricRow ) String ( ) string {
metricName := string ( mr . MetricNameRaw )
var mn MetricName
2021-05-08 14:55:44 +00:00
if err := mn . UnmarshalRaw ( mr . MetricNameRaw ) ; err == nil {
2019-05-22 21:16:55 +00:00
metricName = mn . String ( )
}
2021-03-25 19:30:41 +00:00
return fmt . Sprintf ( "%s (Timestamp=%d, Value=%f)" , metricName , mr . Timestamp , mr . Value )
2019-05-22 21:16:55 +00:00
}
// Marshal appends marshaled mr to dst and returns the result.
func ( mr * MetricRow ) Marshal ( dst [ ] byte ) [ ] byte {
dst = encoding . MarshalBytes ( dst , mr . MetricNameRaw )
dst = encoding . MarshalUint64 ( dst , uint64 ( mr . Timestamp ) )
dst = encoding . MarshalUint64 ( dst , math . Float64bits ( mr . Value ) )
return dst
}
2021-05-08 14:55:44 +00:00
// UnmarshalX unmarshals mr from src and returns the remaining tail from src.
//
// mr refers to src, so it remains valid until src changes.
func ( mr * MetricRow ) UnmarshalX ( src [ ] byte ) ( [ ] byte , error ) {
2019-05-22 21:16:55 +00:00
tail , metricNameRaw , err := encoding . UnmarshalBytes ( src )
if err != nil {
2020-06-30 19:58:18 +00:00
return tail , fmt . Errorf ( "cannot unmarshal MetricName: %w" , err )
2019-05-22 21:16:55 +00:00
}
2021-05-08 14:55:44 +00:00
mr . MetricNameRaw = metricNameRaw
2019-05-22 21:16:55 +00:00
if len ( tail ) < 8 {
return tail , fmt . Errorf ( "cannot unmarshal Timestamp: want %d bytes; have %d bytes" , 8 , len ( tail ) )
}
timestamp := encoding . UnmarshalUint64 ( tail )
mr . Timestamp = int64 ( timestamp )
tail = tail [ 8 : ]
if len ( tail ) < 8 {
return tail , fmt . Errorf ( "cannot unmarshal Value: want %d bytes; have %d bytes" , 8 , len ( tail ) )
}
value := encoding . UnmarshalUint64 ( tail )
mr . Value = math . Float64frombits ( value )
tail = tail [ 8 : ]
return tail , nil
}
2020-09-17 09:01:53 +00:00
// ForceMergePartitions force-merges partitions in s with names starting from the given partitionNamePrefix.
//
// Partitions are merged sequentially in order to reduce load on the system.
func ( s * Storage ) ForceMergePartitions ( partitionNamePrefix string ) error {
return s . tb . ForceMergePartitions ( partitionNamePrefix )
}
2020-10-09 10:35:48 +00:00
var rowsAddedTotal uint64
2019-05-22 21:16:55 +00:00
// AddRows adds the given mrs to s.
func ( s * Storage ) AddRows ( mrs [ ] MetricRow , precisionBits uint8 ) error {
if len ( mrs ) == 0 {
return nil
}
// Limit the number of concurrent goroutines that may add rows to the storage.
// This should prevent from out of memory errors and CPU trashing when too many
// goroutines call AddRows.
select {
case addRowsConcurrencyCh <- struct { } { } :
2019-08-06 11:09:17 +00:00
default :
// Sleep for a while until giving up
atomic . AddUint64 ( & s . addRowsConcurrencyLimitReached , 1 )
t := timerpool . Get ( addRowsTimeout )
2020-08-07 05:47:32 +00:00
// Prioritize data ingestion over concurrent searches.
storagepacelimiter . Search . Inc ( )
2019-08-06 11:09:17 +00:00
select {
case addRowsConcurrencyCh <- struct { } { } :
timerpool . Put ( t )
2020-08-07 05:47:32 +00:00
storagepacelimiter . Search . Dec ( )
2019-08-06 11:09:17 +00:00
case <- t . C :
timerpool . Put ( t )
2020-08-07 05:47:32 +00:00
storagepacelimiter . Search . Dec ( )
2019-08-06 11:09:17 +00:00
atomic . AddUint64 ( & s . addRowsConcurrencyLimitTimeout , 1 )
atomic . AddUint64 ( & s . addRowsConcurrencyDroppedRows , uint64 ( len ( mrs ) ) )
2020-08-05 15:24:51 +00:00
return fmt . Errorf ( "cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers; add more CPUs or reduce load" ,
2019-08-06 11:09:17 +00:00
len ( mrs ) , addRowsTimeout , cap ( addRowsConcurrencyCh ) )
}
2019-05-22 21:16:55 +00:00
}
2021-05-24 12:24:04 +00:00
// Add rows to the storage in blocks with limited size in order to reduce memory usage.
2021-05-24 12:30:39 +00:00
var firstErr error
2021-05-24 12:24:04 +00:00
ic := getMetricRowsInsertCtx ( )
maxBlockLen := len ( ic . rrs )
2021-05-24 12:30:39 +00:00
for len ( mrs ) > 0 {
2021-05-24 12:24:04 +00:00
mrsBlock := mrs
if len ( mrs ) > maxBlockLen {
mrsBlock = mrs [ : maxBlockLen ]
mrs = mrs [ maxBlockLen : ]
} else {
mrs = nil
}
2021-05-24 12:30:39 +00:00
if err := s . add ( ic . rrs , ic . tmpMrs , mrsBlock , precisionBits ) ; err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
2021-05-24 12:24:04 +00:00
atomic . AddUint64 ( & rowsAddedTotal , uint64 ( len ( mrsBlock ) ) )
}
putMetricRowsInsertCtx ( ic )
2019-05-22 21:16:55 +00:00
2020-07-05 16:37:38 +00:00
<- addRowsConcurrencyCh
2021-05-24 12:30:39 +00:00
return firstErr
2019-05-22 21:16:55 +00:00
}
2021-05-24 12:24:04 +00:00
type metricRowsInsertCtx struct {
rrs [ ] rawRow
tmpMrs [ ] * MetricRow
}
func getMetricRowsInsertCtx ( ) * metricRowsInsertCtx {
v := metricRowsInsertCtxPool . Get ( )
if v == nil {
v = & metricRowsInsertCtx {
rrs : make ( [ ] rawRow , maxMetricRowsPerBlock ) ,
tmpMrs : make ( [ ] * MetricRow , maxMetricRowsPerBlock ) ,
}
}
return v . ( * metricRowsInsertCtx )
}
func putMetricRowsInsertCtx ( ic * metricRowsInsertCtx ) {
tmpMrs := ic . tmpMrs
for i := range tmpMrs {
tmpMrs [ i ] = nil
}
metricRowsInsertCtxPool . Put ( ic )
}
var metricRowsInsertCtxPool sync . Pool
const maxMetricRowsPerBlock = 8000
2019-05-22 21:16:55 +00:00
var (
2020-07-08 14:29:57 +00:00
// Limit the concurrency for data ingestion to GOMAXPROCS, since this operation
// is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent
// goroutines on data ingestion path.
2020-12-08 18:49:32 +00:00
addRowsConcurrencyCh = make ( chan struct { } , cgroup . AvailableCPUs ( ) )
2019-05-22 21:16:55 +00:00
addRowsTimeout = 30 * time . Second
)
2020-11-15 22:42:27 +00:00
// RegisterMetricNames registers all the metric names from mns in the indexdb, so they can be queried later.
//
// The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp.
// Th MetricRow.Value field is ignored.
func ( s * Storage ) RegisterMetricNames ( mrs [ ] MetricRow ) error {
var (
tsid TSID
metricName [ ] byte
)
2021-05-23 13:39:55 +00:00
mn := GetMetricName ( )
defer PutMetricName ( mn )
2020-11-15 22:42:27 +00:00
idb := s . idb ( )
is := idb . getIndexSearch ( noDeadline )
defer idb . putIndexSearch ( is )
for i := range mrs {
mr := & mrs [ i ]
if s . getTSIDFromCache ( & tsid , mr . MetricNameRaw ) {
// Fast path - mr.MetricNameRaw has been already registered.
continue
}
// Slow path - register mr.MetricNameRaw.
2021-05-08 14:55:44 +00:00
if err := mn . UnmarshalRaw ( mr . MetricNameRaw ) ; err != nil {
2020-11-15 22:42:27 +00:00
return fmt . Errorf ( "cannot register the metric because cannot unmarshal MetricNameRaw %q: %w" , mr . MetricNameRaw , err )
}
mn . sortTags ( )
metricName = mn . Marshal ( metricName [ : 0 ] )
if err := is . GetOrCreateTSIDByName ( & tsid , metricName ) ; err != nil {
return fmt . Errorf ( "cannot register the metric because cannot create TSID for metricName %q: %w" , metricName , err )
}
s . putTSIDToCache ( & tsid , mr . MetricNameRaw )
// Register the metric in per-day inverted index.
date := uint64 ( mr . Timestamp ) / msecPerDay
metricID := tsid . MetricID
if s . dateMetricIDCache . Has ( date , metricID ) {
// Fast path: the metric has been already registered in per-day inverted index
continue
}
// Slow path: acutally register the metric in per-day inverted index.
ok , err := is . hasDateMetricID ( date , metricID )
if err != nil {
return fmt . Errorf ( "cannot register the metric in per-date inverted index because of error when locating (date=%d, metricID=%d) in database: %w" ,
date , metricID , err )
}
if ! ok {
// The (date, metricID) entry is missing in the indexDB. Add it there.
2021-05-23 13:39:55 +00:00
if err := is . storeDateMetricID ( date , metricID , mn ) ; err != nil {
2020-11-15 22:42:27 +00:00
return fmt . Errorf ( "cannot register the metric in per-date inverted index because of error when storing (date=%d, metricID=%d) in database: %w" ,
date , metricID , err )
}
}
// The metric must be added to cache only after it has been successfully added to indexDB.
s . dateMetricIDCache . Set ( date , metricID )
}
return nil
}
2021-05-24 12:24:04 +00:00
func ( s * Storage ) add ( rows [ ] rawRow , dstMrs [ ] * MetricRow , mrs [ ] MetricRow , precisionBits uint8 ) error {
2019-05-22 21:16:55 +00:00
idb := s . idb ( )
j := 0
2019-12-19 13:12:50 +00:00
var (
2021-03-09 07:18:19 +00:00
// These vars are used for speeding up bulk imports of multiple adjacent rows for the same metricName.
2019-12-19 13:12:50 +00:00
prevTSID TSID
prevMetricNameRaw [ ] byte
)
2020-05-14 20:45:04 +00:00
var pmrs * pendingMetricRows
2019-07-11 14:04:56 +00:00
minTimestamp , maxTimestamp := s . tb . getMinMaxTimestamps ( )
2020-05-14 20:17:22 +00:00
// Return only the first error, since it has no sense in returning all errors.
var firstWarn error
2019-05-22 21:16:55 +00:00
for i := range mrs {
mr := & mrs [ i ]
if math . IsNaN ( mr . Value ) {
// Just skip NaNs, since the underlying encoding
// doesn't know how to work with them.
continue
}
2019-07-26 11:10:25 +00:00
if mr . Timestamp < minTimestamp {
// Skip rows with too small timestamps outside the retention.
2020-05-14 20:17:22 +00:00
if firstWarn == nil {
2020-11-25 12:41:02 +00:00
metricName := getUserReadableMetricName ( mr . MetricNameRaw )
2020-07-08 10:53:29 +00:00
firstWarn = fmt . Errorf ( "cannot insert row with too small timestamp %d outside the retention; minimum allowed timestamp is %d; " +
2020-11-25 12:41:02 +00:00
"probably you need updating -retentionPeriod command-line flag; metricName: %s" ,
mr . Timestamp , minTimestamp , metricName )
2020-05-14 20:17:22 +00:00
}
2019-07-26 11:10:25 +00:00
atomic . AddUint64 ( & s . tooSmallTimestampRows , 1 )
continue
}
if mr . Timestamp > maxTimestamp {
// Skip rows with too big timestamps significantly exceeding the current time.
2020-05-14 20:17:22 +00:00
if firstWarn == nil {
2020-11-25 12:41:02 +00:00
metricName := getUserReadableMetricName ( mr . MetricNameRaw )
firstWarn = fmt . Errorf ( "cannot insert row with too big timestamp %d exceeding the current time; maximum allowed timestamp is %d; metricName: %s" ,
mr . Timestamp , maxTimestamp , metricName )
2020-05-14 20:17:22 +00:00
}
2019-07-26 11:10:25 +00:00
atomic . AddUint64 ( & s . tooBigTimestampRows , 1 )
2019-07-11 14:04:56 +00:00
continue
}
2021-05-23 13:39:55 +00:00
dstMrs [ j ] = mr
r := & rows [ j ]
2019-05-22 21:16:55 +00:00
j ++
r . Timestamp = mr . Timestamp
r . Value = mr . Value
r . PrecisionBits = precisionBits
2019-12-19 13:12:50 +00:00
if string ( mr . MetricNameRaw ) == string ( prevMetricNameRaw ) {
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
r . TSID = prevTSID
continue
}
2020-07-06 18:56:14 +00:00
if s . getTSIDFromCache ( & r . TSID , mr . MetricNameRaw ) {
2021-05-20 11:15:19 +00:00
if s . isSeriesCardinalityExceeded ( r . TSID . MetricID , mr . MetricNameRaw ) {
// Skip the row, since the limit on the number of unique series has been exceeded.
j --
continue
}
2021-03-31 18:22:40 +00:00
// Fast path - the TSID for the given MetricNameRaw has been found in cache and isn't deleted.
2020-07-08 14:29:57 +00:00
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
// contain MetricName->TSID entries for deleted time series.
// See Storage.DeleteMetrics code for details.
2020-05-14 20:23:39 +00:00
prevTSID = r . TSID
prevMetricNameRaw = mr . MetricNameRaw
continue
2019-05-22 21:16:55 +00:00
}
2020-05-14 20:45:04 +00:00
// Slow path - the TSID is missing in the cache.
// Postpone its search in the loop below.
j --
if pmrs == nil {
pmrs = getPendingMetricRows ( )
2019-05-22 21:16:55 +00:00
}
2021-03-31 20:12:56 +00:00
if err := pmrs . addRow ( mr ) ; err != nil {
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
if firstWarn == nil {
firstWarn = err
}
continue
2019-05-22 21:16:55 +00:00
}
2020-05-14 20:45:04 +00:00
}
if pmrs != nil {
// Sort pendingMetricRows by canonical metric name in order to speed up search via `is` in the loop below.
pendingMetricRows := pmrs . pmrs
sort . Slice ( pendingMetricRows , func ( i , j int ) bool {
return string ( pendingMetricRows [ i ] . MetricName ) < string ( pendingMetricRows [ j ] . MetricName )
} )
2020-07-23 17:42:57 +00:00
is := idb . getIndexSearch ( noDeadline )
2020-05-14 20:45:04 +00:00
prevMetricNameRaw = nil
2020-07-30 13:14:51 +00:00
var slowInsertsCount uint64
2020-05-14 20:45:04 +00:00
for i := range pendingMetricRows {
pmr := & pendingMetricRows [ i ]
2021-05-23 13:39:55 +00:00
mr := pmr . mr
dstMrs [ j ] = mr
r := & rows [ j ]
2020-05-14 20:45:04 +00:00
j ++
r . Timestamp = mr . Timestamp
r . Value = mr . Value
r . PrecisionBits = precisionBits
if string ( mr . MetricNameRaw ) == string ( prevMetricNameRaw ) {
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
r . TSID = prevTSID
continue
2020-05-14 20:17:22 +00:00
}
2020-07-30 13:14:51 +00:00
slowInsertsCount ++
2020-05-14 20:45:04 +00:00
if err := is . GetOrCreateTSIDByName ( & r . TSID , pmr . MetricName ) ; err != nil {
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
if firstWarn == nil {
2020-06-30 19:58:18 +00:00
firstWarn = fmt . Errorf ( "cannot obtain or create TSID for MetricName %q: %w" , pmr . MetricName , err )
2020-05-14 20:45:04 +00:00
}
j --
continue
}
2021-05-20 11:15:19 +00:00
if s . isSeriesCardinalityExceeded ( r . TSID . MetricID , mr . MetricNameRaw ) {
// Skip the row, since the limit on the number of unique series has been exceeded.
j --
continue
}
2020-05-14 20:45:04 +00:00
s . putTSIDToCache ( & r . TSID , mr . MetricNameRaw )
2021-03-31 18:22:40 +00:00
prevTSID = r . TSID
prevMetricNameRaw = mr . MetricNameRaw
2019-05-22 21:16:55 +00:00
}
2020-05-14 20:45:04 +00:00
idb . putIndexSearch ( is )
putPendingMetricRows ( pmrs )
2020-07-30 13:14:51 +00:00
atomic . AddUint64 ( & s . slowRowInserts , slowInsertsCount )
2019-05-22 21:16:55 +00:00
}
2020-05-14 20:17:22 +00:00
if firstWarn != nil {
2021-02-10 13:56:07 +00:00
logger . Warnf ( "warn occurred during rows addition: %s" , firstWarn )
2019-10-31 12:29:35 +00:00
}
2021-05-23 13:39:55 +00:00
dstMrs = dstMrs [ : j ]
rows = rows [ : j ]
2019-05-22 21:16:55 +00:00
2020-05-14 20:17:22 +00:00
var firstError error
2019-05-22 21:16:55 +00:00
if err := s . tb . AddRows ( rows ) ; err != nil {
2020-06-30 19:58:18 +00:00
firstError = fmt . Errorf ( "cannot add rows to table: %w" , err )
2019-05-22 21:16:55 +00:00
}
2021-05-23 13:39:55 +00:00
if err := s . updatePerDateData ( rows , dstMrs ) ; err != nil && firstError == nil {
2020-06-30 19:58:18 +00:00
firstError = fmt . Errorf ( "cannot update per-date data: %w" , err )
2019-05-22 21:16:55 +00:00
}
2020-05-14 20:17:22 +00:00
if firstError != nil {
2021-05-24 12:24:04 +00:00
return fmt . Errorf ( "error occurred during rows addition: %w" , firstError )
2019-10-20 20:38:51 +00:00
}
2021-05-24 12:24:04 +00:00
return nil
2019-05-22 21:16:55 +00:00
}
2021-05-20 11:15:19 +00:00
func ( s * Storage ) isSeriesCardinalityExceeded ( metricID uint64 , metricNameRaw [ ] byte ) bool {
if sl := s . hourlySeriesLimiter ; sl != nil && ! sl . Add ( metricID ) {
atomic . AddUint64 ( & s . hourlySeriesLimitRowsDropped , 1 )
logSkippedSeries ( metricNameRaw , "-storage.maxHourlySeries" , sl . MaxItems ( ) )
return true
}
if sl := s . dailySeriesLimiter ; sl != nil && ! sl . Add ( metricID ) {
atomic . AddUint64 ( & s . dailySeriesLimitRowsDropped , 1 )
logSkippedSeries ( metricNameRaw , "-storage.maxDailySeries" , sl . MaxItems ( ) )
return true
}
return false
}
func logSkippedSeries ( metricNameRaw [ ] byte , flagName string , flagValue int ) {
select {
case <- logSkippedSeriesTicker . C :
logger . Warnf ( "skip series %s because %s=%d reached" , getUserReadableMetricName ( metricNameRaw ) , flagName , flagValue )
default :
}
}
var logSkippedSeriesTicker = time . NewTicker ( 5 * time . Second )
2020-11-25 12:41:02 +00:00
func getUserReadableMetricName ( metricNameRaw [ ] byte ) string {
2021-05-23 13:39:55 +00:00
mn := GetMetricName ( )
defer PutMetricName ( mn )
2021-05-08 14:55:44 +00:00
if err := mn . UnmarshalRaw ( metricNameRaw ) ; err != nil {
2020-11-25 12:41:02 +00:00
return fmt . Sprintf ( "cannot unmarshal metricNameRaw %q: %s" , metricNameRaw , err )
}
return mn . String ( )
}
2020-05-14 20:45:04 +00:00
type pendingMetricRow struct {
MetricName [ ] byte
2021-05-23 13:39:55 +00:00
mr * MetricRow
2020-05-14 20:45:04 +00:00
}
type pendingMetricRows struct {
pmrs [ ] pendingMetricRow
metricNamesBuf [ ] byte
lastMetricNameRaw [ ] byte
lastMetricName [ ] byte
2021-03-31 20:12:56 +00:00
mn MetricName
2020-05-14 20:45:04 +00:00
}
func ( pmrs * pendingMetricRows ) reset ( ) {
for _ , pmr := range pmrs . pmrs {
pmr . MetricName = nil
2021-05-23 13:39:55 +00:00
pmr . mr = nil
2020-05-14 20:45:04 +00:00
}
pmrs . pmrs = pmrs . pmrs [ : 0 ]
pmrs . metricNamesBuf = pmrs . metricNamesBuf [ : 0 ]
pmrs . lastMetricNameRaw = nil
pmrs . lastMetricName = nil
2021-03-31 20:12:56 +00:00
pmrs . mn . Reset ( )
2020-05-14 20:45:04 +00:00
}
2021-03-31 20:12:56 +00:00
func ( pmrs * pendingMetricRows ) addRow ( mr * MetricRow ) error {
2020-05-14 20:45:04 +00:00
// Do not spend CPU time on re-calculating canonical metricName during bulk import
// of many rows for the same metric.
if string ( mr . MetricNameRaw ) != string ( pmrs . lastMetricNameRaw ) {
2021-05-08 14:55:44 +00:00
if err := pmrs . mn . UnmarshalRaw ( mr . MetricNameRaw ) ; err != nil {
2021-03-31 20:12:56 +00:00
return fmt . Errorf ( "cannot unmarshal MetricNameRaw %q: %w" , mr . MetricNameRaw , err )
}
pmrs . mn . sortTags ( )
2020-05-14 20:45:04 +00:00
metricNamesBufLen := len ( pmrs . metricNamesBuf )
2021-03-31 20:12:56 +00:00
pmrs . metricNamesBuf = pmrs . mn . Marshal ( pmrs . metricNamesBuf )
2020-05-14 20:45:04 +00:00
pmrs . lastMetricName = pmrs . metricNamesBuf [ metricNamesBufLen : ]
pmrs . lastMetricNameRaw = mr . MetricNameRaw
}
pmrs . pmrs = append ( pmrs . pmrs , pendingMetricRow {
MetricName : pmrs . lastMetricName ,
2021-05-23 13:39:55 +00:00
mr : mr ,
2020-05-14 20:45:04 +00:00
} )
2021-03-31 20:12:56 +00:00
return nil
2020-05-14 20:45:04 +00:00
}
func getPendingMetricRows ( ) * pendingMetricRows {
v := pendingMetricRowsPool . Get ( )
if v == nil {
v = & pendingMetricRows { }
}
return v . ( * pendingMetricRows )
}
func putPendingMetricRows ( pmrs * pendingMetricRows ) {
pmrs . reset ( )
pendingMetricRowsPool . Put ( pmrs )
}
var pendingMetricRowsPool sync . Pool
2021-05-23 13:39:55 +00:00
func ( s * Storage ) updatePerDateData ( rows [ ] rawRow , mrs [ ] * MetricRow ) error {
2019-05-22 21:16:55 +00:00
var date uint64
2019-06-09 16:06:53 +00:00
var hour uint64
2019-05-22 21:16:55 +00:00
var prevTimestamp int64
2019-12-19 13:12:50 +00:00
var (
2021-03-09 07:18:19 +00:00
// These vars are used for speeding up bulk imports when multiple adjacent rows
2019-12-19 13:12:50 +00:00
// contain the same (metricID, date) pairs.
2020-05-14 20:45:04 +00:00
prevDate uint64
prevMetricID uint64
2019-12-19 13:12:50 +00:00
)
2019-11-08 11:16:40 +00:00
hm := s . currHourMetricIDs . Load ( ) . ( * hourMetricIDs )
2021-02-04 16:46:20 +00:00
hmPrev := s . prevHourMetricIDs . Load ( ) . ( * hourMetricIDs )
hmPrevDate := hmPrev . hour / 24
2020-05-11 22:06:17 +00:00
nextDayMetricIDs := & s . nextDayMetricIDs . Load ( ) . ( * byDateMetricIDEntry ) . v
2020-05-14 19:01:51 +00:00
todayShare16bit := uint64 ( ( float64 ( fasttime . UnixTimestamp ( ) % ( 3600 * 24 ) ) / ( 3600 * 24 ) ) * ( 1 << 16 ) )
2020-05-14 20:45:04 +00:00
type pendingDateMetricID struct {
date uint64
metricID uint64
2021-05-23 13:39:55 +00:00
mr * MetricRow
2020-05-14 20:45:04 +00:00
}
var pendingDateMetricIDs [ ] pendingDateMetricID
2021-02-08 10:00:44 +00:00
var pendingNextDayMetricIDs [ ] uint64
var pendingHourEntries [ ] uint64
2019-05-22 21:16:55 +00:00
for i := range rows {
r := & rows [ i ]
if r . Timestamp != prevTimestamp {
date = uint64 ( r . Timestamp ) / msecPerDay
2019-06-09 16:06:53 +00:00
hour = uint64 ( r . Timestamp ) / msecPerHour
2019-05-22 21:16:55 +00:00
prevTimestamp = r . Timestamp
}
metricID := r . TSID . MetricID
2021-02-09 00:51:40 +00:00
if metricID == prevMetricID && date == prevDate {
// Fast path for bulk import of multiple rows with the same (date, metricID) pairs.
continue
}
prevDate = date
prevMetricID = metricID
2019-06-09 16:06:53 +00:00
if hour == hm . hour {
// The r belongs to the current hour. Check for the current hour cache.
2019-09-24 18:10:22 +00:00
if hm . m . Has ( metricID ) {
2019-06-09 16:06:53 +00:00
// Fast path: the metricID is in the current hour cache.
2019-11-09 21:17:42 +00:00
// This means the metricID has been already added to per-day inverted index.
2020-05-11 22:06:17 +00:00
// Gradually pre-populate per-day inverted index for the next day
// during the current day.
// This should reduce CPU usage spike and slowdown at the beginning of the next day
// when entries for all the active time series must be added to the index.
// This should address https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 .
if todayShare16bit > ( metricID & ( 1 << 16 - 1 ) ) && ! nextDayMetricIDs . Has ( metricID ) {
2020-05-14 20:45:04 +00:00
pendingDateMetricIDs = append ( pendingDateMetricIDs , pendingDateMetricID {
date : date + 1 ,
metricID : metricID ,
2021-05-23 13:39:55 +00:00
mr : mrs [ i ] ,
2020-05-14 20:45:04 +00:00
} )
2021-02-08 10:00:44 +00:00
pendingNextDayMetricIDs = append ( pendingNextDayMetricIDs , metricID )
2020-05-11 22:06:17 +00:00
}
2019-06-02 15:34:08 +00:00
continue
}
2021-02-08 10:00:44 +00:00
pendingHourEntries = append ( pendingHourEntries , metricID )
2021-02-04 16:46:20 +00:00
if date == hmPrevDate && hmPrev . m . Has ( metricID ) {
// The metricID is already registered for the current day on the previous hour.
continue
}
2019-06-02 15:34:08 +00:00
}
// Slower path: check global cache for (date, metricID) entry.
2021-02-09 00:51:40 +00:00
if s . dateMetricIDCache . Has ( date , metricID ) {
2019-12-19 13:12:50 +00:00
continue
}
2021-02-09 00:51:40 +00:00
// Slow path: store the (date, metricID) entry in the indexDB.
pendingDateMetricIDs = append ( pendingDateMetricIDs , pendingDateMetricID {
date : date ,
metricID : metricID ,
2021-05-23 13:39:55 +00:00
mr : mrs [ i ] ,
2021-02-09 00:51:40 +00:00
} )
2020-05-14 20:45:04 +00:00
}
2021-02-08 10:00:44 +00:00
if len ( pendingNextDayMetricIDs ) > 0 {
s . pendingNextDayMetricIDsLock . Lock ( )
s . pendingNextDayMetricIDs . AddMulti ( pendingNextDayMetricIDs )
s . pendingNextDayMetricIDsLock . Unlock ( )
}
if len ( pendingHourEntries ) > 0 {
s . pendingHourEntriesLock . Lock ( )
s . pendingHourEntries . AddMulti ( pendingHourEntries )
s . pendingHourEntriesLock . Unlock ( )
}
2020-05-14 20:45:04 +00:00
if len ( pendingDateMetricIDs ) == 0 {
// Fast path - there are no new (date, metricID) entires in rows.
return nil
}
// Slow path - add new (date, metricID) entries to indexDB.
2020-05-15 10:44:23 +00:00
atomic . AddUint64 ( & s . slowPerDayIndexInserts , uint64 ( len ( pendingDateMetricIDs ) ) )
2020-05-14 20:45:04 +00:00
// Sort pendingDateMetricIDs by (date, metricID) in order to speed up `is` search in the loop below.
sort . Slice ( pendingDateMetricIDs , func ( i , j int ) bool {
a := pendingDateMetricIDs [ i ]
b := pendingDateMetricIDs [ j ]
if a . date != b . date {
return a . date < b . date
}
return a . metricID < b . metricID
} )
idb := s . idb ( )
2020-07-23 17:42:57 +00:00
is := idb . getIndexSearch ( noDeadline )
2020-05-14 20:45:04 +00:00
defer idb . putIndexSearch ( is )
var firstError error
2021-02-09 21:59:14 +00:00
dateMetricIDsForCache := make ( [ ] dateMetricID , 0 , len ( pendingDateMetricIDs ) )
2021-05-23 13:39:55 +00:00
mn := GetMetricName ( )
2021-02-09 21:59:14 +00:00
for _ , dmid := range pendingDateMetricIDs {
date := dmid . date
metricID := dmid . metricID
2020-05-14 20:45:04 +00:00
ok , err := is . hasDateMetricID ( date , metricID )
if err != nil {
2020-05-14 20:17:22 +00:00
if firstError == nil {
2020-06-30 19:58:18 +00:00
firstError = fmt . Errorf ( "error when locating (date=%d, metricID=%d) in database: %w" , date , metricID , err )
2020-05-14 20:17:22 +00:00
}
2019-05-22 21:16:55 +00:00
continue
}
2020-05-14 20:45:04 +00:00
if ! ok {
// The (date, metricID) entry is missing in the indexDB. Add it there.
2021-02-09 00:51:40 +00:00
// It is OK if the (date, metricID) entry is added multiple times to db
// by concurrent goroutines.
2021-05-23 13:39:55 +00:00
if err := mn . UnmarshalRaw ( dmid . mr . MetricNameRaw ) ; err != nil {
if firstError == nil {
firstError = fmt . Errorf ( "cannot unmarshal MetricNameRaw %q: %w" , dmid . mr . MetricNameRaw , err )
}
continue
}
mn . sortTags ( )
if err := is . storeDateMetricID ( date , metricID , mn ) ; err != nil {
2020-05-14 20:45:04 +00:00
if firstError == nil {
2020-06-30 19:58:18 +00:00
firstError = fmt . Errorf ( "error when storing (date=%d, metricID=%d) in database: %w" , date , metricID , err )
2020-05-14 20:45:04 +00:00
}
continue
}
}
2021-02-09 21:59:14 +00:00
dateMetricIDsForCache = append ( dateMetricIDsForCache , dateMetricID {
date : date ,
metricID : metricID ,
} )
2019-05-22 21:16:55 +00:00
}
2021-05-23 13:39:55 +00:00
PutMetricName ( mn )
2021-02-09 21:59:14 +00:00
// The (date, metricID) entries must be added to cache only after they have been successfully added to indexDB.
s . dateMetricIDCache . Store ( dateMetricIDsForCache )
2020-05-14 20:17:22 +00:00
return firstError
2019-05-22 21:16:55 +00:00
}
2019-11-09 21:05:14 +00:00
// dateMetricIDCache is fast cache for holding (date, metricID) entries.
//
// It should be faster than map[date]*uint64set.Set on multicore systems.
type dateMetricIDCache struct {
2019-11-11 11:21:05 +00:00
// 64-bit counters must be at the top of the structure to be properly aligned on 32-bit arches.
syncsCount uint64
resetsCount uint64
2019-11-09 21:05:14 +00:00
// Contains immutable map
byDate atomic . Value
// Contains mutable map protected by mu
2021-06-03 13:19:58 +00:00
byDateMutable * byDateMetricIDMap
nextSyncDeadline uint64
mu sync . Mutex
2019-11-09 21:05:14 +00:00
}
func newDateMetricIDCache ( ) * dateMetricIDCache {
var dmc dateMetricIDCache
2021-06-03 13:19:58 +00:00
dmc . resetLocked ( )
2019-11-09 21:05:14 +00:00
return & dmc
}
func ( dmc * dateMetricIDCache ) Reset ( ) {
dmc . mu . Lock ( )
2021-06-03 13:19:58 +00:00
dmc . resetLocked ( )
dmc . mu . Unlock ( )
}
func ( dmc * dateMetricIDCache ) resetLocked ( ) {
2019-11-11 11:21:05 +00:00
// Do not reset syncsCount and resetsCount
2019-11-09 21:05:14 +00:00
dmc . byDate . Store ( newByDateMetricIDMap ( ) )
dmc . byDateMutable = newByDateMetricIDMap ( )
2021-06-03 13:19:58 +00:00
dmc . nextSyncDeadline = 10 + fasttime . UnixTimestamp ( )
2019-11-11 11:21:05 +00:00
atomic . AddUint64 ( & dmc . resetsCount , 1 )
2019-11-09 21:05:14 +00:00
}
func ( dmc * dateMetricIDCache ) EntriesCount ( ) int {
byDate := dmc . byDate . Load ( ) . ( * byDateMetricIDMap )
n := 0
for _ , e := range byDate . m {
n += e . v . Len ( )
}
return n
}
2019-11-13 15:58:05 +00:00
func ( dmc * dateMetricIDCache ) SizeBytes ( ) uint64 {
byDate := dmc . byDate . Load ( ) . ( * byDateMetricIDMap )
n := uint64 ( 0 )
for _ , e := range byDate . m {
n += e . v . SizeBytes ( )
}
return n
}
2019-11-09 21:05:14 +00:00
func ( dmc * dateMetricIDCache ) Has ( date , metricID uint64 ) bool {
byDate := dmc . byDate . Load ( ) . ( * byDateMetricIDMap )
v := byDate . get ( date )
if v . Has ( metricID ) {
// Fast path.
// The majority of calls must go here.
return true
}
// Slow path. Check mutable map.
2019-11-10 20:03:46 +00:00
dmc . mu . Lock ( )
2019-11-09 21:05:14 +00:00
v = dmc . byDateMutable . get ( date )
ok := v . Has ( metricID )
2021-06-03 13:19:58 +00:00
dmc . syncLockedIfNeeded ( )
2019-11-10 20:03:46 +00:00
dmc . mu . Unlock ( )
2019-11-09 21:05:14 +00:00
return ok
}
2021-02-09 21:59:14 +00:00
type dateMetricID struct {
date uint64
metricID uint64
}
func ( dmc * dateMetricIDCache ) Store ( dmids [ ] dateMetricID ) {
var prevDate uint64
metricIDs := make ( [ ] uint64 , 0 , len ( dmids ) )
dmc . mu . Lock ( )
for _ , dmid := range dmids {
if prevDate == dmid . date {
metricIDs = append ( metricIDs , dmid . metricID )
continue
}
if len ( metricIDs ) > 0 {
v := dmc . byDateMutable . getOrCreate ( prevDate )
v . AddMulti ( metricIDs )
}
metricIDs = append ( metricIDs [ : 0 ] , dmid . metricID )
prevDate = dmid . date
}
if len ( metricIDs ) > 0 {
v := dmc . byDateMutable . getOrCreate ( prevDate )
v . AddMulti ( metricIDs )
}
dmc . mu . Unlock ( )
}
2019-11-09 21:05:14 +00:00
func ( dmc * dateMetricIDCache ) Set ( date , metricID uint64 ) {
dmc . mu . Lock ( )
v := dmc . byDateMutable . getOrCreate ( date )
v . Add ( metricID )
dmc . mu . Unlock ( )
}
2021-06-03 13:19:58 +00:00
func ( dmc * dateMetricIDCache ) syncLockedIfNeeded ( ) {
currentTime := fasttime . UnixTimestamp ( )
if currentTime >= dmc . nextSyncDeadline {
dmc . nextSyncDeadline = currentTime + 10
dmc . syncLocked ( )
}
}
func ( dmc * dateMetricIDCache ) syncLocked ( ) {
if len ( dmc . byDateMutable . m ) == 0 {
// Nothing to sync.
return
}
2019-11-09 21:05:14 +00:00
byDate := dmc . byDate . Load ( ) . ( * byDateMetricIDMap )
2021-06-03 13:19:58 +00:00
byDateMutable := dmc . byDateMutable
for date , e := range byDateMutable . m {
2019-11-09 21:05:14 +00:00
v := byDate . get ( date )
2021-06-03 13:19:58 +00:00
if v == nil {
continue
}
v = v . Clone ( )
v . Union ( & e . v )
byDateMutable . m [ date ] = & byDateMetricIDEntry {
date : date ,
v : * v ,
}
}
for date , e := range byDate . m {
v := byDateMutable . get ( date )
if v != nil {
continue
}
byDateMutable . m [ date ] = e
2019-11-09 21:05:14 +00:00
}
dmc . byDate . Store ( dmc . byDateMutable )
2019-11-11 11:21:05 +00:00
dmc . byDateMutable = newByDateMetricIDMap ( )
2019-11-09 21:05:14 +00:00
2019-11-11 11:21:05 +00:00
atomic . AddUint64 ( & dmc . syncsCount , 1 )
2019-11-09 21:05:14 +00:00
if dmc . EntriesCount ( ) > memory . Allowed ( ) / 128 {
2021-06-03 13:19:58 +00:00
dmc . resetLocked ( )
2019-11-09 21:05:14 +00:00
}
}
type byDateMetricIDMap struct {
hotEntry atomic . Value
m map [ uint64 ] * byDateMetricIDEntry
}
func newByDateMetricIDMap ( ) * byDateMetricIDMap {
dmm := & byDateMetricIDMap {
m : make ( map [ uint64 ] * byDateMetricIDEntry ) ,
}
dmm . hotEntry . Store ( & byDateMetricIDEntry { } )
return dmm
}
func ( dmm * byDateMetricIDMap ) get ( date uint64 ) * uint64set . Set {
hotEntry := dmm . hotEntry . Load ( ) . ( * byDateMetricIDEntry )
if hotEntry . date == date {
// Fast path
return & hotEntry . v
}
// Slow path
e := dmm . m [ date ]
if e == nil {
return nil
}
dmm . hotEntry . Store ( e )
return & e . v
}
func ( dmm * byDateMetricIDMap ) getOrCreate ( date uint64 ) * uint64set . Set {
v := dmm . get ( date )
if v != nil {
return v
}
e := & byDateMetricIDEntry {
date : date ,
}
dmm . m [ date ] = e
return & e . v
}
type byDateMetricIDEntry struct {
date uint64
v uint64set . Set
}
2020-05-11 22:06:17 +00:00
func ( s * Storage ) updateNextDayMetricIDs ( ) {
2020-05-14 19:01:51 +00:00
date := fasttime . UnixDate ( )
2020-05-11 22:06:17 +00:00
e := s . nextDayMetricIDs . Load ( ) . ( * byDateMetricIDEntry )
s . pendingNextDayMetricIDsLock . Lock ( )
pendingMetricIDs := s . pendingNextDayMetricIDs
s . pendingNextDayMetricIDs = & uint64set . Set { }
s . pendingNextDayMetricIDsLock . Unlock ( )
if pendingMetricIDs . Len ( ) == 0 && e . date == date {
// Fast path: nothing to update.
return
}
// Slow path: union pendingMetricIDs with e.v
if e . date == date {
pendingMetricIDs . Union ( & e . v )
}
eNew := & byDateMetricIDEntry {
date : date ,
v : * pendingMetricIDs ,
}
s . nextDayMetricIDs . Store ( eNew )
}
2019-06-09 16:06:53 +00:00
func ( s * Storage ) updateCurrHourMetricIDs ( ) {
hm := s . currHourMetricIDs . Load ( ) . ( * hourMetricIDs )
2019-11-08 17:37:16 +00:00
s . pendingHourEntriesLock . Lock ( )
newMetricIDs := s . pendingHourEntries
s . pendingHourEntries = & uint64set . Set { }
s . pendingHourEntriesLock . Unlock ( )
2020-05-14 19:01:51 +00:00
hour := fasttime . UnixHour ( )
2019-11-08 11:16:40 +00:00
if newMetricIDs . Len ( ) == 0 && hm . hour == hour {
2019-06-09 16:06:53 +00:00
// Fast path: nothing to update.
2019-06-02 15:34:08 +00:00
return
}
2019-11-08 17:37:16 +00:00
// Slow path: hm.m must be updated with non-empty s.pendingHourEntries.
2019-09-24 18:10:22 +00:00
var m * uint64set . Set
2019-06-09 16:06:53 +00:00
isFull := hm . isFull
if hm . hour == hour {
2019-09-24 18:10:22 +00:00
m = hm . m . Clone ( )
2019-06-09 16:06:53 +00:00
} else {
2019-09-24 18:10:22 +00:00
m = & uint64set . Set { }
2019-06-09 16:06:53 +00:00
isFull = true
}
2019-11-03 22:34:24 +00:00
m . Union ( newMetricIDs )
2019-06-09 16:06:53 +00:00
hmNew := & hourMetricIDs {
m : m ,
hour : hour ,
isFull : isFull ,
}
s . currHourMetricIDs . Store ( hmNew )
if hm . hour != hour {
s . prevHourMetricIDs . Store ( hm )
2019-06-02 15:34:08 +00:00
}
}
2019-06-09 16:06:53 +00:00
type hourMetricIDs struct {
2019-09-24 18:10:22 +00:00
m * uint64set . Set
2019-06-09 16:06:53 +00:00
hour uint64
isFull bool
2019-06-02 15:34:08 +00:00
}
2019-05-22 21:16:55 +00:00
func ( s * Storage ) getTSIDFromCache ( dst * TSID , metricName [ ] byte ) bool {
buf := ( * [ unsafe . Sizeof ( * dst ) ] byte ) ( unsafe . Pointer ( dst ) ) [ : ]
buf = s . tsidCache . Get ( buf [ : 0 ] , metricName )
return uintptr ( len ( buf ) ) == unsafe . Sizeof ( * dst )
}
func ( s * Storage ) putTSIDToCache ( tsid * TSID , metricName [ ] byte ) {
buf := ( * [ unsafe . Sizeof ( * tsid ) ] byte ) ( unsafe . Pointer ( tsid ) ) [ : ]
s . tsidCache . Set ( metricName , buf )
}
2021-02-10 12:37:14 +00:00
func openIndexDBTables ( path string , metricIDCache , metricNameCache , tsidCache * workingsetcache . Cache , minTimestampForCompositeIndex int64 ) ( curr , prev * indexDB , err error ) {
2019-05-22 21:16:55 +00:00
if err := fs . MkdirAllIfNotExist ( path ) ; err != nil {
2020-06-30 19:58:18 +00:00
return nil , nil , fmt . Errorf ( "cannot create directory %q: %w" , path , err )
2019-05-22 21:16:55 +00:00
}
d , err := os . Open ( path )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , nil , fmt . Errorf ( "cannot open directory: %w" , err )
2019-05-22 21:16:55 +00:00
}
defer fs . MustClose ( d )
// Search for the two most recent tables - the last one is active,
// the previous one contains backup data.
fis , err := d . Readdir ( - 1 )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , nil , fmt . Errorf ( "cannot read directory: %w" , err )
2019-05-22 21:16:55 +00:00
}
var tableNames [ ] string
for _ , fi := range fis {
if ! fs . IsDirOrSymlink ( fi ) {
// Skip non-directories.
continue
}
tableName := fi . Name ( )
if ! indexDBTableNameRegexp . MatchString ( tableName ) {
// Skip invalid directories.
continue
}
tableNames = append ( tableNames , tableName )
}
sort . Slice ( tableNames , func ( i , j int ) bool {
return tableNames [ i ] < tableNames [ j ]
} )
if len ( tableNames ) < 2 {
// Create missing tables
if len ( tableNames ) == 0 {
prevName := nextIndexDBTableName ( )
tableNames = append ( tableNames , prevName )
}
currName := nextIndexDBTableName ( )
tableNames = append ( tableNames , currName )
}
// Invariant: len(tableNames) >= 2
// Remove all the tables except two last tables.
for _ , tn := range tableNames [ : len ( tableNames ) - 2 ] {
pathToRemove := path + "/" + tn
logger . Infof ( "removing obsolete indexdb dir %q..." , pathToRemove )
2019-06-11 22:53:43 +00:00
fs . MustRemoveAll ( pathToRemove )
2019-05-22 21:16:55 +00:00
logger . Infof ( "removed obsolete indexdb dir %q" , pathToRemove )
}
// Persist changes on the file system.
2019-06-11 20:13:04 +00:00
fs . MustSyncPath ( path )
2019-05-22 21:16:55 +00:00
// Open the last two tables.
currPath := path + "/" + tableNames [ len ( tableNames ) - 1 ]
2021-02-10 12:37:14 +00:00
curr , err = openIndexDB ( currPath , metricIDCache , metricNameCache , tsidCache , minTimestampForCompositeIndex )
2019-05-22 21:16:55 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , nil , fmt . Errorf ( "cannot open curr indexdb table at %q: %w" , currPath , err )
2019-05-22 21:16:55 +00:00
}
prevPath := path + "/" + tableNames [ len ( tableNames ) - 2 ]
2021-02-10 12:37:14 +00:00
prev , err = openIndexDB ( prevPath , metricIDCache , metricNameCache , tsidCache , minTimestampForCompositeIndex )
2019-05-22 21:16:55 +00:00
if err != nil {
curr . MustClose ( )
2020-06-30 19:58:18 +00:00
return nil , nil , fmt . Errorf ( "cannot open prev indexdb table at %q: %w" , prevPath , err )
2019-05-22 21:16:55 +00:00
}
return curr , prev , nil
}
var indexDBTableNameRegexp = regexp . MustCompile ( "^[0-9A-F]{16}$" )
func nextIndexDBTableName ( ) string {
n := atomic . AddUint64 ( & indexDBTableIdx , 1 )
return fmt . Sprintf ( "%016X" , n )
}
var indexDBTableIdx = uint64 ( time . Now ( ) . UnixNano ( ) )