all: add -inmemoryDataFlushInterval command-line flag for controlling the frequency of saving in-memory data to disk

The main purpose of this command-line flag is to increase the lifetime of low-end flash storage
with the limited number of write operations it can perform. Such flash storage is usually
installed on Raspberry PI or similar appliances.

For example, `-inmemoryDataFlushInterval=1h` reduces the frequency of disk write operations
to up to once per hour if the ingested one-hour worth of data fits the limit for in-memory data.

The in-memory data is searchable in the same way as the data stored on disk.
VictoriaMetrics automatically flushes the in-memory data to disk on graceful shutdown via SIGINT signal.
The in-memory data is lost on unclean shutdown (hardware power loss, OOM crash, SIGKILL).

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337
This commit is contained in:
Aliaksandr Valialkin 2022-12-05 15:15:00 -08:00
parent e509552e92
commit 8189770c50
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
18 changed files with 1833 additions and 976 deletions

View file

@ -1363,18 +1363,50 @@ It is recommended passing different `-promscrape.cluster.name` values to HA pair
## Storage
VictoriaMetrics stores time series data in [MergeTree](https://en.wikipedia.org/wiki/Log-structured_merge-tree)-like
data structures. On insert, VictoriaMetrics accumulates up to 1s of data and dumps it on disk to
`<-storageDataPath>/data/small/YYYY_MM/` subdirectory forming a `part` with the following
name pattern: `rowsCount_blocksCount_minTimestamp_maxTimestamp`. Each part consists of two "columns":
values and timestamps. These are sorted and compressed raw time series values. Additionally, part contains
index files for searching for specific series in the values and timestamps files.
VictoriaMetrics buffers the ingested data in memory for up to a second. Then the buffered data is written to in-memory `parts`,
which can be searched during queries. The in-memory `parts` are periodically persisted to disk, so they could survive unclean shutdown
such as out of memory crash, hardware power loss or `SIGKILL` signal. The interval for flushing the in-memory data to disk
can be configured with the `-inmemoryDataFlushInterval` command-line flag (note that too short flush interval may significantly increase disk IO).
`Parts` are periodically merged into the bigger parts. The resulting `part` is constructed
under `<-storageDataPath>/data/{small,big}/YYYY_MM/tmp` subdirectory.
When the resulting `part` is complete, it is atomically moved from the `tmp`
to its own subdirectory, while the source parts are atomically removed. The end result is that the source
parts are substituted by a single resulting bigger `part` in the `<-storageDataPath>/data/{small,big}/YYYY_MM/` directory.
In-memory parts are persisted to disk into `part` directories under the `<-storageDataPath>/data/small/YYYY_MM/` folder,
where `YYYY_MM` is the month partition for the stored data. For example, `2022_11` is the partition for `parts`
with [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) from `November 2022`.
The `part` directory has the following name pattern: `rowsCount_blocksCount_minTimestamp_maxTimestamp`, where:
- `rowsCount` - the number of [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) stored in the part
- `blocksCount` - the number of blocks stored in the part (see details about blocks below)
- `minTimestamp` and `maxTimestamp` - minimum and maximum timestamps across raw samples stored in the part
Each `part` consists of `blocks` sorted by internal time series id (aka `TSID`).
Each `block` contains up to 8K [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples),
which belong to a single [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series).
Raw samples in each block are sorted by `timestamp`. Blocks for the same time series are sorted
by the `timestamp` of the first sample. Timestamps and values for all the blocks
are stored in [compressed form](https://faun.pub/victoriametrics-achieving-better-compression-for-time-series-data-than-gorilla-317bc1f95932)
in separate files under `part` directory - `timestamps.bin` and `values.bin`.
The `part` directory also contains `index.bin` and `metaindex.bin` files - these files contain index
for fast block lookups, which belong to the given `TSID` and cover the given time range.
`Parts` are periodically merged into bigger parts in background. The background merge provides the following benefits:
* keeping the number of data files under control, so they don't exceed limits on open files
* improved data compression, since bigger parts are usually compressed better than smaller parts
* improved query speed, since queries over smaller number of parts are executed faster
* various background maintenance tasks such as [de-duplication](#deduplication), [downsampling](#downsampling)
and [freeing up disk space for the deleted time series](#how-to-delete-time-series) are performed during the merge
Newly added `parts` either successfully appear in the storage or fail to appear.
The newly added `parts` are being created in a temporary directory under `<-storageDataPath>/data/{small,big}/YYYY_MM/tmp` folder.
When the newly added `part` is fully written and [fsynced](https://man7.org/linux/man-pages/man2/fsync.2.html)
to a temporary directory, then it is atomically moved to the storage directory.
Thanks to this alogrithm, storage never contains partially created parts, even if hardware power off
occurrs in the middle of writing the `part` to disk - such incompletely written `parts`
are automatically deleted on the next VictoriaMetrics start.
The same applies to merge process — `parts` are either fully merged into a new `part` or fail to merge,
leaving the source `parts` untouched.
VictoriaMetrics doesn't merge parts if their summary size exceeds free disk space.
This prevents from potential out of disk space errors during merge.
@ -1383,24 +1415,10 @@ This increases overhead during data querying, since VictoriaMetrics needs to rea
bigger number of parts per each request. That's why it is recommended to have at least 20%
of free disk space under directory pointed by `-storageDataPath` command-line flag.
Information about merging process is available in [single-node VictoriaMetrics](https://grafana.com/dashboards/10229)
and [clustered VictoriaMetrics](https://grafana.com/grafana/dashboards/11176) Grafana dashboards.
Information about merging process is available in [the dashboard for single-node VictoriaMetrics](https://grafana.com/dashboards/10229)
and [the dashboard for VictoriaMetrics cluster](https://grafana.com/grafana/dashboards/11176).
See more details in [monitoring docs](#monitoring).
The `merge` process improves compression rate and keeps number of `parts` on disk relatively low.
Benefits of doing the merge process are the following:
* it improves query performance, since lower number of `parts` are inspected with each query
* it reduces the number of data files, since each `part` contains fixed number of files
* various background maintenance tasks such as [de-duplication](#deduplication), [downsampling](#downsampling)
and [freeing up disk space for the deleted time series](#how-to-delete-time-series) are performed during the merge.
Newly added `parts` either appear in the storage or fail to appear.
Storage never contains partially created parts. The same applies to merge process — `parts` are either fully
merged into a new `part` or fail to merge. MergeTree doesn't contain partially merged `parts`.
`Part` contents in MergeTree never change. Parts are immutable. They may be only deleted after the merge
to a bigger `part` or when the `part` contents goes outside the configured `-retentionPeriod`.
See [this article](https://valyala.medium.com/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282) for more details.
See also [how to work with snapshots](#how-to-work-with-snapshots).
@ -1723,9 +1741,10 @@ and [cardinality explorer docs](#cardinality-explorer).
* VictoriaMetrics buffers incoming data in memory for up to a few seconds before flushing it to persistent storage.
This may lead to the following "issues":
* Data becomes available for querying in a few seconds after inserting. It is possible to flush in-memory buffers to persistent storage
* Data becomes available for querying in a few seconds after inserting. It is possible to flush in-memory buffers to searchable parts
by requesting `/internal/force_flush` http handler. This handler is mostly needed for testing and debugging purposes.
* The last few seconds of inserted data may be lost on unclean shutdown (i.e. OOM, `kill -9` or hardware reset).
The `-inmemoryDataFlushInterval` command-line flag allows controlling the frequency of in-memory data flush to persistent storage.
See [this article for technical details](https://valyala.medium.com/wal-usage-looks-broken-in-modern-time-series-databases-b62a627ab704).
* If VictoriaMetrics works slowly and eats more than a CPU core per 100K ingested data points per second,
@ -2133,6 +2152,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Uses '{measurement}' instead of '{measurement}{separator}{field_name}' for metic name if InfluxDB line contains only a single field
-influxTrimTimestamp duration
Trim timestamps for InfluxDB line protocol data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
-inmemoryDataFlushInterval duration
The interval for guaranteed saving of in-memory data to disk. The saved data survives unclean shutdown such as OOM crash, hardware reset, SIGKILL, etc. Bigger intervals may help increasing lifetime of flash storage with limited write cycles (e.g. Raspberry PI). Smaller intervals increase disk IO load. Minimum supported value is 1s (default 5s)
-insert.maxQueueDuration duration
The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts (default 1m0s)
-logNewSeries

View file

@ -29,6 +29,10 @@ var (
"equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling")
dryRun = flag.Bool("dryRun", false, "Whether to check only -promscrape.config and then exit. "+
"Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag")
inmemoryDataFlushInterval = flag.Duration("inmemoryDataFlushInterval", 5*time.Second, "The interval for guaranteed saving of in-memory data to disk. "+
"The saved data survives unclean shutdown such as OOM crash, hardware reset, SIGKILL, etc. "+
"Bigger intervals may help increasing lifetime of flash storage with limited write cycles (e.g. Raspberry PI). "+
"Smaller intervals increase disk IO load. Minimum supported value is 1s")
)
func main() {
@ -54,6 +58,7 @@ func main() {
logger.Infof("starting VictoriaMetrics at %q...", *httpListenAddr)
startTime := time.Now()
storage.SetDedupInterval(*minScrapeInterval)
storage.SetDataFlushInterval(*inmemoryDataFlushInterval)
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
vmselect.Init()
vminsert.Init()

View file

@ -100,7 +100,7 @@ func InitWithoutMetrics(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
storage.SetLogNewSeries(*logNewSeries)
storage.SetFinalMergeDelay(*finalMergeDelay)
storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)
storage.SetMergeWorkersCount(*smallMergeConcurrency)
storage.SetRetentionTimezoneOffset(*retentionTimezoneOffset)
storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceBytes.N)
storage.SetTSIDCacheSize(cacheSizeStorageTSID.N)
@ -453,56 +453,80 @@ func registerStorageMetrics(strg *storage.Storage) {
return 0
})
metrics.NewGauge(`vm_active_merges{type="storage/big"}`, func() float64 {
return float64(tm().ActiveBigMerges)
metrics.NewGauge(`vm_active_merges{type="storage/inmemory"}`, func() float64 {
return float64(tm().ActiveInmemoryMerges)
})
metrics.NewGauge(`vm_active_merges{type="storage/small"}`, func() float64 {
return float64(tm().ActiveSmallMerges)
})
metrics.NewGauge(`vm_active_merges{type="indexdb"}`, func() float64 {
return float64(idbm().ActiveMerges)
metrics.NewGauge(`vm_active_merges{type="storage/big"}`, func() float64 {
return float64(tm().ActiveBigMerges)
})
metrics.NewGauge(`vm_active_merges{type="indexdb/inmemory"}`, func() float64 {
return float64(idbm().ActiveInmemoryMerges)
})
metrics.NewGauge(`vm_active_merges{type="indexdb/file"}`, func() float64 {
return float64(idbm().ActiveFileMerges)
})
metrics.NewGauge(`vm_merges_total{type="storage/big"}`, func() float64 {
return float64(tm().BigMergesCount)
metrics.NewGauge(`vm_merges_total{type="storage/inmemory"}`, func() float64 {
return float64(tm().InmemoryMergesCount)
})
metrics.NewGauge(`vm_merges_total{type="storage/small"}`, func() float64 {
return float64(tm().SmallMergesCount)
})
metrics.NewGauge(`vm_merges_total{type="indexdb"}`, func() float64 {
return float64(idbm().MergesCount)
metrics.NewGauge(`vm_merges_total{type="storage/big"}`, func() float64 {
return float64(tm().BigMergesCount)
})
metrics.NewGauge(`vm_merges_total{type="indexdb/inmemory"}`, func() float64 {
return float64(idbm().InmemoryMergesCount)
})
metrics.NewGauge(`vm_merges_total{type="indexdb/file"}`, func() float64 {
return float64(idbm().FileMergesCount)
})
metrics.NewGauge(`vm_rows_merged_total{type="storage/big"}`, func() float64 {
return float64(tm().BigRowsMerged)
metrics.NewGauge(`vm_rows_merged_total{type="storage/inmemory"}`, func() float64 {
return float64(tm().InmemoryRowsMerged)
})
metrics.NewGauge(`vm_rows_merged_total{type="storage/small"}`, func() float64 {
return float64(tm().SmallRowsMerged)
})
metrics.NewGauge(`vm_rows_merged_total{type="indexdb"}`, func() float64 {
return float64(idbm().ItemsMerged)
metrics.NewGauge(`vm_rows_merged_total{type="storage/big"}`, func() float64 {
return float64(tm().BigRowsMerged)
})
metrics.NewGauge(`vm_rows_merged_total{type="indexdb/inmemory"}`, func() float64 {
return float64(idbm().InmemoryItemsMerged)
})
metrics.NewGauge(`vm_rows_merged_total{type="indexdb/file"}`, func() float64 {
return float64(idbm().FileItemsMerged)
})
metrics.NewGauge(`vm_rows_deleted_total{type="storage/big"}`, func() float64 {
return float64(tm().BigRowsDeleted)
metrics.NewGauge(`vm_rows_deleted_total{type="storage/inmemory"}`, func() float64 {
return float64(tm().InmemoryRowsDeleted)
})
metrics.NewGauge(`vm_rows_deleted_total{type="storage/small"}`, func() float64 {
return float64(tm().SmallRowsDeleted)
})
metrics.NewGauge(`vm_references{type="storage/big", name="parts"}`, func() float64 {
return float64(tm().BigPartsRefCount)
metrics.NewGauge(`vm_rows_deleted_total{type="storage/big"}`, func() float64 {
return float64(tm().BigRowsDeleted)
})
metrics.NewGauge(`vm_references{type="storage/small", name="parts"}`, func() float64 {
metrics.NewGauge(`vm_part_references{type="storage/inmemory"}`, func() float64 {
return float64(tm().InmemoryPartsRefCount)
})
metrics.NewGauge(`vm_part_references{type="storage/small"}`, func() float64 {
return float64(tm().SmallPartsRefCount)
})
metrics.NewGauge(`vm_references{type="storage", name="partitions"}`, func() float64 {
metrics.NewGauge(`vm_part_references{type="storage/big"}`, func() float64 {
return float64(tm().BigPartsRefCount)
})
metrics.NewGauge(`vm_partition_references{type="storage"}`, func() float64 {
return float64(tm().PartitionsRefCount)
})
metrics.NewGauge(`vm_references{type="indexdb", name="objects"}`, func() float64 {
metrics.NewGauge(`vm_object_references{type="indexdb"}`, func() float64 {
return float64(idbm().IndexDBRefCount)
})
metrics.NewGauge(`vm_references{type="indexdb", name="parts"}`, func() float64 {
metrics.NewGauge(`vm_part_references{type="indexdb"}`, func() float64 {
return float64(idbm().PartsRefCount)
})
@ -531,11 +555,11 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(idbm().CompositeFilterMissingConversions)
})
metrics.NewGauge(`vm_assisted_merges_total{type="storage/small"}`, func() float64 {
return float64(tm().SmallAssistedMerges)
metrics.NewGauge(`vm_assisted_merges_total{type="storage/inmemory"}`, func() float64 {
return float64(tm().InmemoryAssistedMerges)
})
metrics.NewGauge(`vm_assisted_merges_total{type="indexdb"}`, func() float64 {
return float64(idbm().AssistedMerges)
metrics.NewGauge(`vm_assisted_merges_total{type="indexdb/inmemory"}`, func() float64 {
return float64(idbm().AssistedInmemoryMerges)
})
metrics.NewGauge(`vm_indexdb_items_added_total`, func() float64 {
@ -546,11 +570,8 @@ func registerStorageMetrics(strg *storage.Storage) {
})
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/686
metrics.NewGauge(`vm_merge_need_free_disk_space{type="storage/small"}`, func() float64 {
return float64(tm().SmallMergeNeedFreeDiskSpace)
})
metrics.NewGauge(`vm_merge_need_free_disk_space{type="storage/big"}`, func() float64 {
return float64(tm().BigMergeNeedFreeDiskSpace)
metrics.NewGauge(`vm_merge_need_free_disk_space`, func() float64 {
return float64(tm().MergeNeedFreeDiskSpace)
})
metrics.NewGauge(`vm_pending_rows{type="storage"}`, func() float64 {
@ -560,34 +581,52 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(idbm().PendingItems)
})
metrics.NewGauge(`vm_parts{type="storage/big"}`, func() float64 {
return float64(tm().BigPartsCount)
metrics.NewGauge(`vm_parts{type="storage/inmemory"}`, func() float64 {
return float64(tm().InmemoryPartsCount)
})
metrics.NewGauge(`vm_parts{type="storage/small"}`, func() float64 {
return float64(tm().SmallPartsCount)
})
metrics.NewGauge(`vm_parts{type="indexdb"}`, func() float64 {
return float64(idbm().PartsCount)
metrics.NewGauge(`vm_parts{type="storage/big"}`, func() float64 {
return float64(tm().BigPartsCount)
})
metrics.NewGauge(`vm_parts{type="indexdb/inmemory"}`, func() float64 {
return float64(idbm().InmemoryPartsCount)
})
metrics.NewGauge(`vm_parts{type="indexdb/file"}`, func() float64 {
return float64(idbm().FilePartsCount)
})
metrics.NewGauge(`vm_blocks{type="storage/big"}`, func() float64 {
return float64(tm().BigBlocksCount)
metrics.NewGauge(`vm_blocks{type="storage/inmemory"}`, func() float64 {
return float64(tm().InmemoryBlocksCount)
})
metrics.NewGauge(`vm_blocks{type="storage/small"}`, func() float64 {
return float64(tm().SmallBlocksCount)
})
metrics.NewGauge(`vm_blocks{type="indexdb"}`, func() float64 {
return float64(idbm().BlocksCount)
metrics.NewGauge(`vm_blocks{type="storage/big"}`, func() float64 {
return float64(tm().BigBlocksCount)
})
metrics.NewGauge(`vm_blocks{type="indexdb/inmemory"}`, func() float64 {
return float64(idbm().InmemoryBlocksCount)
})
metrics.NewGauge(`vm_blocks{type="indexdb/file"}`, func() float64 {
return float64(idbm().FileBlocksCount)
})
metrics.NewGauge(`vm_data_size_bytes{type="storage/big"}`, func() float64 {
return float64(tm().BigSizeBytes)
metrics.NewGauge(`vm_data_size_bytes{type="storage/inmemory"}`, func() float64 {
return float64(tm().InmemorySizeBytes)
})
metrics.NewGauge(`vm_data_size_bytes{type="storage/small"}`, func() float64 {
return float64(tm().SmallSizeBytes)
})
metrics.NewGauge(`vm_data_size_bytes{type="indexdb"}`, func() float64 {
return float64(idbm().SizeBytes)
metrics.NewGauge(`vm_data_size_bytes{type="storage/big"}`, func() float64 {
return float64(tm().BigSizeBytes)
})
metrics.NewGauge(`vm_data_size_bytes{type="indexdb/inmemory"}`, func() float64 {
return float64(idbm().InmemorySizeBytes)
})
metrics.NewGauge(`vm_data_size_bytes{type="indexdb/file"}`, func() float64 {
return float64(idbm().FileSizeBytes)
})
metrics.NewGauge(`vm_rows_added_to_storage_total`, func() float64 {
@ -665,14 +704,20 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(m().TimestampsBytesSaved)
})
metrics.NewGauge(`vm_rows{type="storage/big"}`, func() float64 {
return float64(tm().BigRowsCount)
metrics.NewGauge(`vm_rows{type="storage/inmemory"}`, func() float64 {
return float64(tm().InmemoryRowsCount)
})
metrics.NewGauge(`vm_rows{type="storage/small"}`, func() float64 {
return float64(tm().SmallRowsCount)
})
metrics.NewGauge(`vm_rows{type="indexdb"}`, func() float64 {
return float64(idbm().ItemsCount)
metrics.NewGauge(`vm_rows{type="storage/big"}`, func() float64 {
return float64(tm().BigRowsCount)
})
metrics.NewGauge(`vm_rows{type="indexdb/inmemory"}`, func() float64 {
return float64(idbm().InmemoryItemsCount)
})
metrics.NewGauge(`vm_rows{type="indexdb/file"}`, func() float64 {
return float64(idbm().FileItemsCount)
})
metrics.NewGauge(`vm_date_range_search_calls_total`, func() float64 {

View file

@ -17,6 +17,34 @@ The following tip changes can be tested by building VictoriaMetrics components f
**Update note 1:** this release drops support for direct upgrade from VictoriaMetrics versions prior [v1.28.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.28.0). Please upgrade to `v1.84.0`, wait until `finished round 2 of background conversion` line is emitted to log by single-node VictoriaMetrics or by `vmstorage`, and then upgrade to newer releases.
**Update note 2:** this release splits `type="indexdb"` metrics into `type="indexdb/inmemory"` and `type="indexdb/file"` metrics. This may break old dashboards and alerting rules, which contain label filters on `{type="indexdb"}`. It is recommended upgrading to the latest available dashboards and alerting rules mentioned in [these docs](https://docs.victoriametrics.com/#monitoring).
* FEATURE: add `-inmemoryDataFlushInterval` command-line flag, which can be used for controlling the frequency of in-memory data flush to disk. The data flush frequency can be reduced when VictoriaMetrics stores data to low-end flash device with limited number of write cycles (for example, on Raspberry PI). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337).
* FEATURE: expose additional metrics for `indexdb` and `storage` parts stored in memory and for `indexdb` parts stored in files (see [storage docs](https://docs.victoriametrics.com/#storage) for technical details):
* `vm_active_merges{type="storage/inmemory"}` - active merges for in-memory `storage` parts
* `vm_active_merges{type="indexdb/inmemory"}` - active merges for in-memory `indexdb` parts
* `vm_active_merges{type="indexdb/file"}` - active merges for file-based `indexdb` parts
* `vm_merges_total{type="storage/inmemory"}` - the total merges for in-memory `storage` parts
* `vm_merges_total{type="indexdb/inmemory"}` - the total merges for in-memory `indexdb` parts
* `vm_merges_total{type="indexdb/file"}` - the total merges for file-based `indexdb` parts
* `vm_rows_merged_total{type="storage/inmemory"}` - the total rows merged for in-memory `storage` parts
* `vm_rows_merged_total{type="indexdb/inmemory"}` - the total rows merged for in-memory `indexdb` parts
* `vm_rows_merged_total{type="indexdb/file"}` - the total rows merged for file-based `indexdb` parts
* `vm_rows_deleted_total{type="storage/inmemory"}` - the total rows deleted for in-memory `storage` parts
* `vm_assisted_merges_total{type="storage/inmemory"}` - the total number of assisted merges for in-memory `storage` parts
* `vm_assisted_merges_total{type="indexdb/inmemory"}` - the total number of assisted merges for in-memory `indexdb` parts
* `vm_parts{type="storage/inmemory"}` - the total number of in-memory `storage` parts
* `vm_parts{type="indexdb/inmemory"}` - the total number of in-memory `indexdb` parts
* `vm_parts{type="indexdb/file"}` - the total number of file-based `indexdb` parts
* `vm_blocks{type="storage/inmemory"}` - the total number of in-memory `storage` blocks
* `vm_blocks{type="indexdb/inmemory"}` - the total number of in-memory `indexdb` blocks
* `vm_blocks{type="indexdb/file"}` - the total number of file-based `indexdb` blocks
* `vm_data_size_bytes{type="storage/inmemory"}` - the total size of in-memory `storage` blocks
* `vm_data_size_bytes{type="indexdb/inmemory"}` - the total size of in-memory `indexdb` blocks
* `vm_data_size_bytes{type="indexdb/file"}` - the total size of file-based `indexdb` blocks
* `vm_rows{type="storage/inmemory"}` - the total number of in-memory `storage` rows
* `vm_rows{type="indexdb/inmemory"}` - the total number of in-memory `indexdb` rows
* `vm_rows{type="indexdb/file"}` - the total number of file-based `indexdb` rows
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve [service discovery](https://docs.victoriametrics.com/sd_configs.html) performance when discovering big number of targets (10K and more).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `exported_` prefix to metric names exported by scrape targets if these metric names clash with [automatically generated metrics](https://docs.victoriametrics.com/vmagent.html#automatically-generated-metrics) such as `up`, `scrape_samples_scraped`, etc. This prevents from corruption of automatically generated metrics. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3406).
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve error message when the requested path cannot be properly parsed, so users could identify the issue and properly fix the path. Now the error message links to [url format docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3402).

View file

@ -1364,18 +1364,50 @@ It is recommended passing different `-promscrape.cluster.name` values to HA pair
## Storage
VictoriaMetrics stores time series data in [MergeTree](https://en.wikipedia.org/wiki/Log-structured_merge-tree)-like
data structures. On insert, VictoriaMetrics accumulates up to 1s of data and dumps it on disk to
`<-storageDataPath>/data/small/YYYY_MM/` subdirectory forming a `part` with the following
name pattern: `rowsCount_blocksCount_minTimestamp_maxTimestamp`. Each part consists of two "columns":
values and timestamps. These are sorted and compressed raw time series values. Additionally, part contains
index files for searching for specific series in the values and timestamps files.
VictoriaMetrics buffers the ingested data in memory for up to a second. Then the buffered data is written to in-memory `parts`,
which can be searched during queries. The in-memory `parts` are periodically persisted to disk, so they could survive unclean shutdown
such as out of memory crash, hardware power loss or `SIGKILL` signal. The interval for flushing the in-memory data to disk
can be configured with the `-inmemoryDataFlushInterval` command-line flag (note that too short flush interval may significantly increase disk IO).
`Parts` are periodically merged into the bigger parts. The resulting `part` is constructed
under `<-storageDataPath>/data/{small,big}/YYYY_MM/tmp` subdirectory.
When the resulting `part` is complete, it is atomically moved from the `tmp`
to its own subdirectory, while the source parts are atomically removed. The end result is that the source
parts are substituted by a single resulting bigger `part` in the `<-storageDataPath>/data/{small,big}/YYYY_MM/` directory.
In-memory parts are persisted to disk into `part` directories under the `<-storageDataPath>/data/small/YYYY_MM/` folder,
where `YYYY_MM` is the month partition for the stored data. For example, `2022_11` is the partition for `parts`
with [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) from `November 2022`.
The `part` directory has the following name pattern: `rowsCount_blocksCount_minTimestamp_maxTimestamp`, where:
- `rowsCount` - the number of [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) stored in the part
- `blocksCount` - the number of blocks stored in the part (see details about blocks below)
- `minTimestamp` and `maxTimestamp` - minimum and maximum timestamps across raw samples stored in the part
Each `part` consists of `blocks` sorted by internal time series id (aka `TSID`).
Each `block` contains up to 8K [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples),
which belong to a single [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series).
Raw samples in each block are sorted by `timestamp`. Blocks for the same time series are sorted
by the `timestamp` of the first sample. Timestamps and values for all the blocks
are stored in [compressed form](https://faun.pub/victoriametrics-achieving-better-compression-for-time-series-data-than-gorilla-317bc1f95932)
in separate files under `part` directory - `timestamps.bin` and `values.bin`.
The `part` directory also contains `index.bin` and `metaindex.bin` files - these files contain index
for fast block lookups, which belong to the given `TSID` and cover the given time range.
`Parts` are periodically merged into bigger parts in background. The background merge provides the following benefits:
* keeping the number of data files under control, so they don't exceed limits on open files
* improved data compression, since bigger parts are usually compressed better than smaller parts
* improved query speed, since queries over smaller number of parts are executed faster
* various background maintenance tasks such as [de-duplication](#deduplication), [downsampling](#downsampling)
and [freeing up disk space for the deleted time series](#how-to-delete-time-series) are performed during the merge
Newly added `parts` either successfully appear in the storage or fail to appear.
The newly added `parts` are being created in a temporary directory under `<-storageDataPath>/data/{small,big}/YYYY_MM/tmp` folder.
When the newly added `part` is fully written and [fsynced](https://man7.org/linux/man-pages/man2/fsync.2.html)
to a temporary directory, then it is atomically moved to the storage directory.
Thanks to this alogrithm, storage never contains partially created parts, even if hardware power off
occurrs in the middle of writing the `part` to disk - such incompletely written `parts`
are automatically deleted on the next VictoriaMetrics start.
The same applies to merge process — `parts` are either fully merged into a new `part` or fail to merge,
leaving the source `parts` untouched.
VictoriaMetrics doesn't merge parts if their summary size exceeds free disk space.
This prevents from potential out of disk space errors during merge.
@ -1384,24 +1416,10 @@ This increases overhead during data querying, since VictoriaMetrics needs to rea
bigger number of parts per each request. That's why it is recommended to have at least 20%
of free disk space under directory pointed by `-storageDataPath` command-line flag.
Information about merging process is available in [single-node VictoriaMetrics](https://grafana.com/dashboards/10229)
and [clustered VictoriaMetrics](https://grafana.com/grafana/dashboards/11176) Grafana dashboards.
Information about merging process is available in [the dashboard for single-node VictoriaMetrics](https://grafana.com/dashboards/10229)
and [the dashboard for VictoriaMetrics cluster](https://grafana.com/grafana/dashboards/11176).
See more details in [monitoring docs](#monitoring).
The `merge` process improves compression rate and keeps number of `parts` on disk relatively low.
Benefits of doing the merge process are the following:
* it improves query performance, since lower number of `parts` are inspected with each query
* it reduces the number of data files, since each `part` contains fixed number of files
* various background maintenance tasks such as [de-duplication](#deduplication), [downsampling](#downsampling)
and [freeing up disk space for the deleted time series](#how-to-delete-time-series) are performed during the merge.
Newly added `parts` either appear in the storage or fail to appear.
Storage never contains partially created parts. The same applies to merge process — `parts` are either fully
merged into a new `part` or fail to merge. MergeTree doesn't contain partially merged `parts`.
`Part` contents in MergeTree never change. Parts are immutable. They may be only deleted after the merge
to a bigger `part` or when the `part` contents goes outside the configured `-retentionPeriod`.
See [this article](https://valyala.medium.com/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282) for more details.
See also [how to work with snapshots](#how-to-work-with-snapshots).
@ -1724,9 +1742,10 @@ and [cardinality explorer docs](#cardinality-explorer).
* VictoriaMetrics buffers incoming data in memory for up to a few seconds before flushing it to persistent storage.
This may lead to the following "issues":
* Data becomes available for querying in a few seconds after inserting. It is possible to flush in-memory buffers to persistent storage
* Data becomes available for querying in a few seconds after inserting. It is possible to flush in-memory buffers to searchable parts
by requesting `/internal/force_flush` http handler. This handler is mostly needed for testing and debugging purposes.
* The last few seconds of inserted data may be lost on unclean shutdown (i.e. OOM, `kill -9` or hardware reset).
The `-inmemoryDataFlushInterval` command-line flag allows controlling the frequency of in-memory data flush to persistent storage.
See [this article for technical details](https://valyala.medium.com/wal-usage-looks-broken-in-modern-time-series-databases-b62a627ab704).
* If VictoriaMetrics works slowly and eats more than a CPU core per 100K ingested data points per second,
@ -2134,6 +2153,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Uses '{measurement}' instead of '{measurement}{separator}{field_name}' for metic name if InfluxDB line contains only a single field
-influxTrimTimestamp duration
Trim timestamps for InfluxDB line protocol data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
-inmemoryDataFlushInterval duration
The interval for guaranteed saving of in-memory data to disk. The saved data survives unclean shutdown such as OOM crash, hardware reset, SIGKILL, etc. Bigger intervals may help increasing lifetime of flash storage with limited write cycles (e.g. Raspberry PI). Smaller intervals increase disk IO load. Minimum supported value is 1s (default 5s)
-insert.maxQueueDuration duration
The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts (default 1m0s)
-logNewSeries

View file

@ -1367,18 +1367,50 @@ It is recommended passing different `-promscrape.cluster.name` values to HA pair
## Storage
VictoriaMetrics stores time series data in [MergeTree](https://en.wikipedia.org/wiki/Log-structured_merge-tree)-like
data structures. On insert, VictoriaMetrics accumulates up to 1s of data and dumps it on disk to
`<-storageDataPath>/data/small/YYYY_MM/` subdirectory forming a `part` with the following
name pattern: `rowsCount_blocksCount_minTimestamp_maxTimestamp`. Each part consists of two "columns":
values and timestamps. These are sorted and compressed raw time series values. Additionally, part contains
index files for searching for specific series in the values and timestamps files.
VictoriaMetrics buffers the ingested data in memory for up to a second. Then the buffered data is written to in-memory `parts`,
which can be searched during queries. The in-memory `parts` are periodically persisted to disk, so they could survive unclean shutdown
such as out of memory crash, hardware power loss or `SIGKILL` signal. The interval for flushing the in-memory data to disk
can be configured with the `-inmemoryDataFlushInterval` command-line flag (note that too short flush interval may significantly increase disk IO).
`Parts` are periodically merged into the bigger parts. The resulting `part` is constructed
under `<-storageDataPath>/data/{small,big}/YYYY_MM/tmp` subdirectory.
When the resulting `part` is complete, it is atomically moved from the `tmp`
to its own subdirectory, while the source parts are atomically removed. The end result is that the source
parts are substituted by a single resulting bigger `part` in the `<-storageDataPath>/data/{small,big}/YYYY_MM/` directory.
In-memory parts are persisted to disk into `part` directories under the `<-storageDataPath>/data/small/YYYY_MM/` folder,
where `YYYY_MM` is the month partition for the stored data. For example, `2022_11` is the partition for `parts`
with [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) from `November 2022`.
The `part` directory has the following name pattern: `rowsCount_blocksCount_minTimestamp_maxTimestamp`, where:
- `rowsCount` - the number of [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) stored in the part
- `blocksCount` - the number of blocks stored in the part (see details about blocks below)
- `minTimestamp` and `maxTimestamp` - minimum and maximum timestamps across raw samples stored in the part
Each `part` consists of `blocks` sorted by internal time series id (aka `TSID`).
Each `block` contains up to 8K [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples),
which belong to a single [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series).
Raw samples in each block are sorted by `timestamp`. Blocks for the same time series are sorted
by the `timestamp` of the first sample. Timestamps and values for all the blocks
are stored in [compressed form](https://faun.pub/victoriametrics-achieving-better-compression-for-time-series-data-than-gorilla-317bc1f95932)
in separate files under `part` directory - `timestamps.bin` and `values.bin`.
The `part` directory also contains `index.bin` and `metaindex.bin` files - these files contain index
for fast block lookups, which belong to the given `TSID` and cover the given time range.
`Parts` are periodically merged into bigger parts in background. The background merge provides the following benefits:
* keeping the number of data files under control, so they don't exceed limits on open files
* improved data compression, since bigger parts are usually compressed better than smaller parts
* improved query speed, since queries over smaller number of parts are executed faster
* various background maintenance tasks such as [de-duplication](#deduplication), [downsampling](#downsampling)
and [freeing up disk space for the deleted time series](#how-to-delete-time-series) are performed during the merge
Newly added `parts` either successfully appear in the storage or fail to appear.
The newly added `parts` are being created in a temporary directory under `<-storageDataPath>/data/{small,big}/YYYY_MM/tmp` folder.
When the newly added `part` is fully written and [fsynced](https://man7.org/linux/man-pages/man2/fsync.2.html)
to a temporary directory, then it is atomically moved to the storage directory.
Thanks to this alogrithm, storage never contains partially created parts, even if hardware power off
occurrs in the middle of writing the `part` to disk - such incompletely written `parts`
are automatically deleted on the next VictoriaMetrics start.
The same applies to merge process — `parts` are either fully merged into a new `part` or fail to merge,
leaving the source `parts` untouched.
VictoriaMetrics doesn't merge parts if their summary size exceeds free disk space.
This prevents from potential out of disk space errors during merge.
@ -1387,24 +1419,10 @@ This increases overhead during data querying, since VictoriaMetrics needs to rea
bigger number of parts per each request. That's why it is recommended to have at least 20%
of free disk space under directory pointed by `-storageDataPath` command-line flag.
Information about merging process is available in [single-node VictoriaMetrics](https://grafana.com/dashboards/10229)
and [clustered VictoriaMetrics](https://grafana.com/grafana/dashboards/11176) Grafana dashboards.
Information about merging process is available in [the dashboard for single-node VictoriaMetrics](https://grafana.com/dashboards/10229)
and [the dashboard for VictoriaMetrics cluster](https://grafana.com/grafana/dashboards/11176).
See more details in [monitoring docs](#monitoring).
The `merge` process improves compression rate and keeps number of `parts` on disk relatively low.
Benefits of doing the merge process are the following:
* it improves query performance, since lower number of `parts` are inspected with each query
* it reduces the number of data files, since each `part` contains fixed number of files
* various background maintenance tasks such as [de-duplication](#deduplication), [downsampling](#downsampling)
and [freeing up disk space for the deleted time series](#how-to-delete-time-series) are performed during the merge.
Newly added `parts` either appear in the storage or fail to appear.
Storage never contains partially created parts. The same applies to merge process — `parts` are either fully
merged into a new `part` or fail to merge. MergeTree doesn't contain partially merged `parts`.
`Part` contents in MergeTree never change. Parts are immutable. They may be only deleted after the merge
to a bigger `part` or when the `part` contents goes outside the configured `-retentionPeriod`.
See [this article](https://valyala.medium.com/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282) for more details.
See also [how to work with snapshots](#how-to-work-with-snapshots).
@ -1727,9 +1745,10 @@ and [cardinality explorer docs](#cardinality-explorer).
* VictoriaMetrics buffers incoming data in memory for up to a few seconds before flushing it to persistent storage.
This may lead to the following "issues":
* Data becomes available for querying in a few seconds after inserting. It is possible to flush in-memory buffers to persistent storage
* Data becomes available for querying in a few seconds after inserting. It is possible to flush in-memory buffers to searchable parts
by requesting `/internal/force_flush` http handler. This handler is mostly needed for testing and debugging purposes.
* The last few seconds of inserted data may be lost on unclean shutdown (i.e. OOM, `kill -9` or hardware reset).
The `-inmemoryDataFlushInterval` command-line flag allows controlling the frequency of in-memory data flush to persistent storage.
See [this article for technical details](https://valyala.medium.com/wal-usage-looks-broken-in-modern-time-series-databases-b62a627ab704).
* If VictoriaMetrics works slowly and eats more than a CPU core per 100K ingested data points per second,
@ -2137,6 +2156,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Uses '{measurement}' instead of '{measurement}{separator}{field_name}' for metic name if InfluxDB line contains only a single field
-influxTrimTimestamp duration
Trim timestamps for InfluxDB line protocol data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
-inmemoryDataFlushInterval duration
The interval for guaranteed saving of in-memory data to disk. The saved data survives unclean shutdown such as OOM crash, hardware reset, SIGKILL, etc. Bigger intervals may help increasing lifetime of flash storage with limited write cycles (e.g. Raspberry PI). Smaller intervals increase disk IO load. Minimum supported value is 1s (default 5s)
-insert.maxQueueDuration duration
The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts (default 1m0s)
-logNewSeries

View file

@ -1,8 +1,12 @@
package mergeset
import (
"fmt"
"path/filepath"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@ -28,6 +32,36 @@ func (mp *inmemoryPart) Reset() {
mp.lensData.Reset()
}
// StoreToDisk stores mp to the given path on disk.
func (mp *inmemoryPart) StoreToDisk(path string) error {
if err := fs.MkdirAllIfNotExist(path); err != nil {
return fmt.Errorf("cannot create directory %q: %w", path, err)
}
metaindexPath := path + "/metaindex.bin"
if err := fs.WriteFileAndSync(metaindexPath, mp.metaindexData.B); err != nil {
return fmt.Errorf("cannot store metaindex: %w", err)
}
indexPath := path + "/index.bin"
if err := fs.WriteFileAndSync(indexPath, mp.indexData.B); err != nil {
return fmt.Errorf("cannot store index: %w", err)
}
itemsPath := path + "/items.bin"
if err := fs.WriteFileAndSync(itemsPath, mp.itemsData.B); err != nil {
return fmt.Errorf("cannot store items: %w", err)
}
lensPath := path + "/lens.bin"
if err := fs.WriteFileAndSync(lensPath, mp.lensData.B); err != nil {
return fmt.Errorf("cannot store lens: %w", err)
}
if err := mp.ph.WriteMetadata(path); err != nil {
return fmt.Errorf("cannot store metadata: %w", err)
}
// Sync parent directory in order to make sure the written files remain visible after hardware reset
parentDirPath := filepath.Dir(path)
fs.MustSyncPath(parentDirPath)
return nil
}
// Init initializes mp from ib.
func (mp *inmemoryPart) Init(ib *inmemoryBlock) {
mp.Reset()

File diff suppressed because it is too large Load diff

View file

@ -90,8 +90,8 @@ func TestTableAddItemsSerial(t *testing.T) {
var m TableMetrics
tb.UpdateMetrics(&m)
if m.ItemsCount != itemsCount {
t.Fatalf("unexpected itemsCount; got %d; want %v", m.ItemsCount, itemsCount)
if n := m.TotalItemsCount(); n != itemsCount {
t.Fatalf("unexpected itemsCount; got %d; want %v", n, itemsCount)
}
tb.MustClose()
@ -235,8 +235,8 @@ func TestTableAddItemsConcurrent(t *testing.T) {
var m TableMetrics
tb.UpdateMetrics(&m)
if m.ItemsCount != itemsCount {
t.Fatalf("unexpected itemsCount; got %d; want %v", m.ItemsCount, itemsCount)
if n := m.TotalItemsCount(); n != itemsCount {
t.Fatalf("unexpected itemsCount; got %d; want %v", n, itemsCount)
}
tb.MustClose()
@ -292,8 +292,8 @@ func testReopenTable(t *testing.T, path string, itemsCount int) {
}
var m TableMetrics
tb.UpdateMetrics(&m)
if m.ItemsCount != uint64(itemsCount) {
t.Fatalf("unexpected itemsCount after re-opening; got %d; want %v", m.ItemsCount, itemsCount)
if n := m.TotalItemsCount(); n != uint64(itemsCount) {
t.Fatalf("unexpected itemsCount after re-opening; got %d; want %v", n, itemsCount)
}
tb.MustClose()
}

View file

@ -1480,8 +1480,8 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
// verify the storage contains rows.
var m Metrics
s.UpdateMetrics(&m)
if m.TableMetrics.SmallRowsCount < uint64(metricRowsN) {
t.Fatalf("expecting at least %d rows in the table; got %d", metricRowsN, m.TableMetrics.SmallRowsCount)
if rowsCount := m.TableMetrics.TotalRowsCount(); rowsCount < uint64(metricRowsN) {
t.Fatalf("expecting at least %d rows in the table; got %d", metricRowsN, rowsCount)
}
// check new series were registered in indexDB

View file

@ -1,9 +1,13 @@
package storage
import (
"fmt"
"path/filepath"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@ -31,6 +35,36 @@ func (mp *inmemoryPart) Reset() {
mp.creationTime = 0
}
// StoreToDisk stores the mp to the given path on disk.
func (mp *inmemoryPart) StoreToDisk(path string) error {
if err := fs.MkdirAllIfNotExist(path); err != nil {
return fmt.Errorf("cannot create directory %q: %w", path, err)
}
timestampsPath := path + "/timestamps.bin"
if err := fs.WriteFileAndSync(timestampsPath, mp.timestampsData.B); err != nil {
return fmt.Errorf("cannot store timestamps: %w", err)
}
valuesPath := path + "/values.bin"
if err := fs.WriteFileAndSync(valuesPath, mp.valuesData.B); err != nil {
return fmt.Errorf("cannot store values: %w", err)
}
indexPath := path + "/index.bin"
if err := fs.WriteFileAndSync(indexPath, mp.indexData.B); err != nil {
return fmt.Errorf("cannot store index: %w", err)
}
metaindexPath := path + "/metaindex.bin"
if err := fs.WriteFileAndSync(metaindexPath, mp.metaindexData.B); err != nil {
return fmt.Errorf("cannot store metaindex: %w", err)
}
if err := mp.ph.writeMinDedupInterval(path); err != nil {
return fmt.Errorf("cannot store min dedup interval: %w", err)
}
// Sync parent directory in order to make sure the written files remain visible after hardware reset
parentDirPath := filepath.Dir(path)
fs.MustSyncPath(parentDirPath)
return nil
}
// InitFromRows initializes mp from the given rows.
func (mp *inmemoryPart) InitFromRows(rows []rawRow) {
if len(rows) == 0 {

File diff suppressed because it is too large Load diff

View file

@ -181,11 +181,12 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
t.Fatalf("cannot remove big parts directory: %s", err)
}
}()
var tmpRows []rawRow
for _, rows := range rowss {
pt.AddRows(rows)
// Flush just added rows to a separate partition.
pt.flushPendingRows(true)
// Flush just added rows to a separate partitions.
tmpRows = pt.flushPendingRows(tmpRows[:0], true)
}
testPartitionSearch(t, pt, tsids, tr, rbsExpected, -1)
pt.MustClose()
@ -232,8 +233,7 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp
// due to the race with raw rows flusher.
var m partitionMetrics
pt.UpdateMetrics(&m)
rowsCount := m.BigRowsCount + m.SmallRowsCount
if rowsCount != uint64(rowsCountExpected) {
if rowsCount := m.TotalRowsCount(); rowsCount != uint64(rowsCountExpected) {
return fmt.Errorf("unexpected rows count; got %d; want %d", rowsCount, rowsCountExpected)
}
}
@ -258,8 +258,7 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp
if rowsCountExpected >= 0 {
var m partitionMetrics
pt.UpdateMetrics(&m)
rowsCount := m.BigRowsCount + m.SmallRowsCount
if rowsCount != uint64(rowsCountExpected) {
if rowsCount := m.TotalRowsCount(); rowsCount != uint64(rowsCountExpected) {
return fmt.Errorf("unexpected rows count after search; got %d; want %d", rowsCount, rowsCountExpected)
}
}

View file

@ -454,7 +454,7 @@ func TestStorageOpenMultipleTimes(t *testing.T) {
func TestStorageRandTimestamps(t *testing.T) {
path := "TestStorageRandTimestamps"
retentionMsecs := int64(60 * msecsPerMonth)
retentionMsecs := int64(10 * msecsPerMonth)
s, err := OpenStorage(path, retentionMsecs, 0, 0)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
@ -462,10 +462,13 @@ func TestStorageRandTimestamps(t *testing.T) {
t.Run("serial", func(t *testing.T) {
for i := 0; i < 3; i++ {
if err := testStorageRandTimestamps(s); err != nil {
t.Fatal(err)
t.Fatalf("error on iteration %d: %s", i, err)
}
s.MustClose()
s, err = OpenStorage(path, retentionMsecs, 0, 0)
if err != nil {
t.Fatalf("cannot open storage on iteration %d: %s", i, err)
}
}
})
t.Run("concurrent", func(t *testing.T) {
@ -479,14 +482,15 @@ func TestStorageRandTimestamps(t *testing.T) {
ch <- err
}()
}
tt := time.NewTimer(time.Second * 10)
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil {
t.Fatal(err)
t.Fatalf("error on iteration %d: %s", i, err)
}
case <-time.After(time.Second * 10):
t.Fatal("timeout")
case <-tt.C:
t.Fatalf("timeout on iteration %d", i)
}
}
})
@ -497,9 +501,9 @@ func TestStorageRandTimestamps(t *testing.T) {
}
func testStorageRandTimestamps(s *Storage) error {
const rowsPerAdd = 1e3
const addsCount = 2
typ := reflect.TypeOf(int64(0))
currentTime := timestampFromTime(time.Now())
const rowsPerAdd = 5e3
const addsCount = 3
rnd := rand.New(rand.NewSource(1))
for i := 0; i < addsCount; i++ {
@ -512,15 +516,8 @@ func testStorageRandTimestamps(s *Storage) error {
for j := 0; j < rowsPerAdd; j++ {
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", rand.Intn(100)))
metricNameRaw := mn.marshalRaw(nil)
timestamp := int64(rnd.NormFloat64() * 1e12)
if j%2 == 0 {
ts, ok := quick.Value(typ, rnd)
if !ok {
return fmt.Errorf("cannot create random timestamp via quick.Value")
}
timestamp = ts.Interface().(int64)
}
value := rnd.NormFloat64() * 1e12
timestamp := currentTime - int64((rnd.Float64()-0.2)*float64(2*s.retentionMsecs))
value := rnd.NormFloat64() * 1e11
mr := MetricRow{
MetricNameRaw: metricNameRaw,
@ -540,8 +537,8 @@ func testStorageRandTimestamps(s *Storage) error {
// Verify the storage contains rows.
var m Metrics
s.UpdateMetrics(&m)
if m.TableMetrics.SmallRowsCount == 0 {
return fmt.Errorf("expecting at least one row in the table")
if rowsCount := m.TableMetrics.TotalRowsCount(); rowsCount == 0 {
return fmt.Errorf("expecting at least one row in storage")
}
return nil
}
@ -592,14 +589,15 @@ func TestStorageDeleteSeries(t *testing.T) {
ch <- err
}(i)
}
tt := time.NewTimer(30 * time.Second)
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error: %s", err)
t.Fatalf("unexpected error on iteration %d: %s", i, err)
}
case <-time.After(30 * time.Second):
t.Fatalf("timeout")
case <-tt.C:
t.Fatalf("timeout on iteration %d", i)
}
}
})
@ -932,7 +930,8 @@ func testStorageRegisterMetricNames(s *Storage) error {
func TestStorageAddRowsSerial(t *testing.T) {
path := "TestStorageAddRowsSerial"
s, err := OpenStorage(path, 0, 1e5, 1e5)
retentionMsecs := int64(msecsPerMonth * 10)
s, err := OpenStorage(path, retentionMsecs, 1e5, 1e5)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
@ -947,7 +946,8 @@ func TestStorageAddRowsSerial(t *testing.T) {
func TestStorageAddRowsConcurrent(t *testing.T) {
path := "TestStorageAddRowsConcurrent"
s, err := OpenStorage(path, 0, 1e5, 1e5)
retentionMsecs := int64(msecsPerMonth * 10)
s, err := OpenStorage(path, retentionMsecs, 1e5, 1e5)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
@ -1000,8 +1000,10 @@ func testStorageAddRows(s *Storage) error {
const rowsPerAdd = 1e3
const addsCount = 10
maxTimestamp := timestampFromTime(time.Now())
minTimestamp := maxTimestamp - s.retentionMsecs
for i := 0; i < addsCount; i++ {
mrs := testGenerateMetricRows(rowsPerAdd, 0, 1e10)
mrs := testGenerateMetricRows(rowsPerAdd, minTimestamp, maxTimestamp)
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
return fmt.Errorf("unexpected error when adding mrs: %w", err)
}
@ -1011,8 +1013,8 @@ func testStorageAddRows(s *Storage) error {
minRowsExpected := uint64(rowsPerAdd * addsCount)
var m Metrics
s.UpdateMetrics(&m)
if m.TableMetrics.SmallRowsCount < minRowsExpected {
return fmt.Errorf("expecting at least %d rows in the table; got %d", minRowsExpected, m.TableMetrics.SmallRowsCount)
if rowsCount := m.TableMetrics.TotalRowsCount(); rowsCount < minRowsExpected {
return fmt.Errorf("expecting at least %d rows in the table; got %d", minRowsExpected, rowsCount)
}
// Try creating a snapshot from the storage.
@ -1040,8 +1042,8 @@ func testStorageAddRows(s *Storage) error {
// Verify the snapshot contains rows
var m1 Metrics
s1.UpdateMetrics(&m1)
if m1.TableMetrics.SmallRowsCount < minRowsExpected {
return fmt.Errorf("snapshot %q must contain at least %d rows; got %d", snapshotPath, minRowsExpected, m1.TableMetrics.SmallRowsCount)
if rowsCount := m1.TableMetrics.TotalRowsCount(); rowsCount < minRowsExpected {
return fmt.Errorf("snapshot %q must contain at least %d rows; got %d", snapshotPath, minRowsExpected, rowsCount)
}
// Verify that force merge for the snapshot leaves only a single part per partition.
@ -1155,22 +1157,25 @@ func testStorageAddMetrics(s *Storage, workerNum int) error {
minRowsExpected := uint64(rowsCount)
var m Metrics
s.UpdateMetrics(&m)
if m.TableMetrics.SmallRowsCount < minRowsExpected {
return fmt.Errorf("expecting at least %d rows in the table; got %d", minRowsExpected, m.TableMetrics.SmallRowsCount)
if rowsCount := m.TableMetrics.TotalRowsCount(); rowsCount < minRowsExpected {
return fmt.Errorf("expecting at least %d rows in the table; got %d", minRowsExpected, rowsCount)
}
return nil
}
func TestStorageDeleteStaleSnapshots(t *testing.T) {
path := "TestStorageDeleteStaleSnapshots"
s, err := OpenStorage(path, 0, 1e5, 1e5)
retentionMsecs := int64(msecsPerMonth * 10)
s, err := OpenStorage(path, retentionMsecs, 1e5, 1e5)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
const rowsPerAdd = 1e3
const addsCount = 10
maxTimestamp := timestampFromTime(time.Now())
minTimestamp := maxTimestamp - s.retentionMsecs
for i := 0; i < addsCount; i++ {
mrs := testGenerateMetricRows(rowsPerAdd, 0, 1e10)
mrs := testGenerateMetricRows(rowsPerAdd, minTimestamp, maxTimestamp)
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
t.Fatalf("unexpected error when adding mrs: %s", err)
}

View file

@ -215,15 +215,16 @@ func (tb *table) MustClose() {
}
}
// flushPendingRows flushes all the pending rows, so they become visible to search.
// flushPendingRows flushes all the pending raw rows, so they become visible to search.
//
// This function is for debug purposes only.
func (tb *table) flushPendingRows() {
ptws := tb.GetPartitions(nil)
defer tb.PutPartitions(ptws)
var rows []rawRow
for _, ptw := range ptws {
ptw.pt.flushPendingRows(true)
rows = ptw.pt.flushPendingRows(rows[:0], true)
}
}
@ -524,7 +525,7 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) (
func populatePartitionNames(partitionsPath string, ptNames map[string]bool) error {
d, err := os.Open(partitionsPath)
if err != nil {
return fmt.Errorf("cannot open directory with partitions %q: %w", partitionsPath, err)
return fmt.Errorf("cannot open directory with partitions: %w", err)
}
defer fs.MustClose(d)

View file

@ -35,7 +35,7 @@ func TestTableSearch(t *testing.T) {
MinTimestamp: trData.MinTimestamp + 4e3,
MaxTimestamp: trData.MaxTimestamp - 4e3,
}
testTableSearchEx(t, trData, trSearch, 12, 100, 1, 10)
testTableSearchEx(t, trData, trSearch, 12, 20, 1, 10)
})
t.Run("SingleTSID", func(t *testing.T) {
@ -51,7 +51,7 @@ func TestTableSearch(t *testing.T) {
MinTimestamp: trData.MinTimestamp + 4e3,
MaxTimestamp: trData.MaxTimestamp - 4e3,
}
testTableSearchEx(t, trData, trSearch, 60, 20, 30, 20)
testTableSearchEx(t, trData, trSearch, 20, 10, 30, 20)
})
t.Run("ManyTSIDs", func(t *testing.T) {
@ -244,8 +244,7 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected []
// they may race with raw rows flusher.
var m TableMetrics
tb.UpdateMetrics(&m)
rowsCount := m.BigRowsCount + m.SmallRowsCount
if rowsCount != uint64(rowsCountExpected) {
if rowsCount := m.TotalRowsCount(); rowsCount != uint64(rowsCountExpected) {
return fmt.Errorf("unexpected rows count in the table; got %d; want %d", rowsCount, rowsCountExpected)
}
}
@ -270,8 +269,7 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected []
if rowsCountExpected >= 0 {
var m TableMetrics
tb.UpdateMetrics(&m)
rowsCount := m.BigRowsCount + m.SmallRowsCount
if rowsCount != uint64(rowsCountExpected) {
if rowsCount := m.TotalRowsCount(); rowsCount != uint64(rowsCountExpected) {
return fmt.Errorf("unexpected rows count in the table; got %d; want %d", rowsCount, rowsCountExpected)
}
}

View file

@ -55,9 +55,8 @@ func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount
rowsCountExpected := insertsCount * uint64(rowsPerInsert)
var m TableMetrics
tb.UpdateMetrics(&m)
rowsCountActual := m.BigRowsCount + m.SmallRowsCount
if rowsCountActual != rowsCountExpected {
b.Fatalf("unexpected rows count in the table %q; got %d; want %d", path, rowsCountActual, rowsCountExpected)
if rowsCount := m.TotalRowsCount(); rowsCount != rowsCountExpected {
b.Fatalf("unexpected rows count in the table %q; got %d; want %d", path, rowsCount, rowsCountExpected)
}
return tb

View file

@ -101,8 +101,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
}
var m TableMetrics
tb.UpdateMetrics(&m)
rowsCount := m.BigRowsCount + m.SmallRowsCount
if rowsCount != uint64(rowsCountExpected) {
if rowsCount := m.TotalRowsCount(); rowsCount != uint64(rowsCountExpected) {
b.Fatalf("unexpected rows count in the final table %q: got %d; want %d", tablePath, rowsCount, rowsCountExpected)
}
tb.MustClose()