all: add Windows build for VictoriaMetrics

This commit changes background merge algorithm, so it becomes compatible with Windows file semantics.

The previous algorithm for background merge:

1. Merge source parts into a destination part inside tmp directory.
2. Create a file in txn directory with instructions on how to atomically
   swap source parts with the destination part.
3. Perform instructions from the file.
4. Delete the file with instructions.

This algorithm guarantees that either source parts or destination part
is visible in the partition after unclean shutdown at any step above,
since the remaining files with instructions is replayed on the next restart,
after that the remaining contents of the tmp directory is deleted.

Unfortunately this algorithm doesn't work under Windows because
it disallows removing and moving files, which are in use.

So the new algorithm for background merge has been implemented:

1. Merge source parts into a destination part inside the partition directory itself.
   E.g. now the partition directory may contain both complete and incomplete parts.
2. Atomically update the parts.json file with the new list of parts after the merge,
   e.g. remove the source parts from the list and add the destination part to the list
   before storing it to parts.json file.
3. Remove the source parts from disk when they are no longer used.

This algorithm guarantees that either source parts or destination part
is visible in the partition after unclean shutdown at any step above,
since incomplete partitions from step 1 or old source parts from step 3 are removed
on the next startup by inspecting parts.json file.

This algorithm should work under Windows, since it doesn't remove or move files in use.
This algorithm has also the following benefits:

- It should work better for NFS.
- It fits object storage semantics.

The new algorithm changes data storage format, so it is impossible to downgrade
to the previous versions of VictoriaMetrics after upgrading to this algorithm.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3236
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3821
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70
This commit is contained in:
Aliaksandr Valialkin 2023-03-19 01:36:05 -07:00
parent 6460475e3b
commit 43b24164ef
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
17 changed files with 510 additions and 826 deletions

View file

@ -186,7 +186,8 @@ release-victoria-metrics: \
release-victoria-metrics-darwin-amd64 \
release-victoria-metrics-darwin-arm64 \
release-victoria-metrics-freebsd-amd64 \
release-victoria-metrics-openbsd-amd64
release-victoria-metrics-openbsd-amd64 \
release-victoria-metrics-windows-amd64
# adds i386 arch
release-victoria-metrics-linux-386:
@ -213,6 +214,9 @@ release-victoria-metrics-freebsd-amd64:
release-victoria-metrics-openbsd-amd64:
GOOS=openbsd GOARCH=amd64 $(MAKE) release-victoria-metrics-goos-goarch
release-victoria-metrics-windows-amd64:
GOARCH=amd64 $(MAKE) release-victoria-metrics-windows-goarch
release-victoria-metrics-goos-goarch: victoria-metrics-$(GOOS)-$(GOARCH)-prod
cd bin && \
tar --transform="flags=r;s|-$(GOOS)-$(GOARCH)||" -czf victoria-metrics-$(GOOS)-$(GOARCH)-$(PKG_TAG).tar.gz \
@ -222,6 +226,16 @@ release-victoria-metrics-goos-goarch: victoria-metrics-$(GOOS)-$(GOARCH)-prod
| sed s/-$(GOOS)-$(GOARCH)-prod/-prod/ > victoria-metrics-$(GOOS)-$(GOARCH)-$(PKG_TAG)_checksums.txt
cd bin && rm -rf victoria-metrics-$(GOOS)-$(GOARCH)-prod
release-victoria-metrics-windows-goarch: victoria-metrics-windows-$(GOARCH)-prod
cd bin && \
zip victoria-metrics-windows-$(GOARCH)-$(PKG_TAG).zip \
victoria-metrics-windows-$(GOARCH)-prod.exe \
&& sha256sum victoria-metrics-windows-$(GOARCH)-$(PKG_TAG).zip \
victoria-metrics-windows-$(GOARCH)-prod.exe \
> victoria-metrics-windows-$(GOARCH)-$(PKG_TAG)_checksums.txt
cd bin && rm -rf \
victoria-metrics-windows-$(GOARCH)-prod.exe
release-vmutils: \
release-vmutils-linux-386 \
release-vmutils-linux-amd64 \
@ -314,7 +328,6 @@ release-vmutils-windows-goarch: \
vmauth-windows-$(GOARCH)-prod.exe \
vmctl-windows-$(GOARCH)-prod.exe
pprof-cpu:
go tool pprof -trim_path=github.com/VictoriaMetrics/VictoriaMetrics@ $(PPROF_FILE)

View file

@ -1447,12 +1447,14 @@ can be configured with the `-inmemoryDataFlushInterval` command-line flag (note
In-memory parts are persisted to disk into `part` directories under the `<-storageDataPath>/data/small/YYYY_MM/` folder,
where `YYYY_MM` is the month partition for the stored data. For example, `2022_11` is the partition for `parts`
with [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) from `November 2022`.
Each partition directory contains `parts.json` file with the actual list of parts in the partition.
The `part` directory has the following name pattern: `rowsCount_blocksCount_minTimestamp_maxTimestamp`, where:
Every `part` directory contains `metadata.json` file with the following fields:
- `rowsCount` - the number of [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) stored in the part
- `blocksCount` - the number of blocks stored in the part (see details about blocks below)
- `minTimestamp` and `maxTimestamp` - minimum and maximum timestamps across raw samples stored in the part
- `RowsCount` - the number of [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) stored in the part
- `BlocksCount` - the number of blocks stored in the part (see details about blocks below)
- `MinTimestamp` and `MaxTimestamp` - minimum and maximum timestamps across raw samples stored in the part
- `MinDedupInterval` - the [deduplication interval](#deduplication) applied to the given part.
Each `part` consists of `blocks` sorted by internal time series id (aka `TSID`).
Each `block` contains up to 8K [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples),
@ -1474,9 +1476,8 @@ for fast block lookups, which belong to the given `TSID` and cover the given tim
and [freeing up disk space for the deleted time series](#how-to-delete-time-series) are performed during the merge
Newly added `parts` either successfully appear in the storage or fail to appear.
The newly added `parts` are being created in a temporary directory under `<-storageDataPath>/data/{small,big}/YYYY_MM/tmp` folder.
When the newly added `part` is fully written and [fsynced](https://man7.org/linux/man-pages/man2/fsync.2.html)
to a temporary directory, then it is atomically moved to the storage directory.
The newly added `part` is atomically registered in the `parts.json` file under the corresponding partition
after it is fully written and [fsynced](https://man7.org/linux/man-pages/man2/fsync.2.html) to the storage.
Thanks to this alogrithm, storage never contains partially created parts, even if hardware power off
occurrs in the middle of writing the `part` to disk - such incompletely written `parts`
are automatically deleted on the next VictoriaMetrics start.
@ -1505,8 +1506,7 @@ Retention is configured with the `-retentionPeriod` command-line flag, which tak
Data is split in per-month partitions inside `<-storageDataPath>/data/{small,big}` folders.
Data partitions outside the configured retention are deleted on the first day of the new month.
Each partition consists of one or more data parts with the following name pattern `rowsCount_blocksCount_minTimestamp_maxTimestamp`.
Data parts outside of the configured retention are eventually deleted during
Each partition consists of one or more data parts. Data parts outside of the configured retention are eventually deleted during
[background merge](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282).
The maximum disk space usage for a given `-retentionPeriod` is going to be (`-retentionPeriod` + 1) months.

View file

@ -39,6 +39,9 @@ victoria-metrics-freebsd-amd64-prod:
victoria-metrics-openbsd-amd64-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-openbsd-amd64
victoria-metrics-windows-amd64-prod:
APP_NAME=victoria-metrics $(MAKE) app-via-docker-windows-amd64
package-victoria-metrics:
APP_NAME=victoria-metrics $(MAKE) package-via-docker
@ -100,6 +103,9 @@ victoria-metrics-freebsd-amd64:
victoria-metrics-openbsd-amd64:
APP_NAME=victoria-metrics CGO_ENABLED=0 GOOS=openbsd GOARCH=amd64 $(MAKE) app-local-goos-goarch
victoria-metrics-windows-amd64:
GOARCH=amd64 APP_NAME=victoria-metrics $(MAKE) app-local-windows-goarch
victoria-metrics-pure:
APP_NAME=victoria-metrics $(MAKE) app-local-pure

View file

@ -15,6 +15,11 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip
**Update note: this release contains backwards-incompatible change in storage data format,
so the previous versions of VictoriaMetrics will exit with the `unexpected number of substrings in the part name` error when trying to run them on the data
created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or newer releases**
* FEATURE: publish VictoriaMetrics binaries for Windows. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3236), [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3821) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70) issues.
* FEATURE: log metrics with truncated labels if the length of label value in the ingested metric exceeds `-maxLabelValueLen`. This should simplify debugging for this case.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [VictoriaMetrics remote write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol) when [sending / receiving data to / from Kafka](https://docs.victoriametrics.com/vmagent.html#kafka-integration). This protocol allows saving egress network bandwidth costs when sending data from `vmagent` to `Kafka` located in another datacenter or availability zone. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1225).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `--kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957).

View file

@ -1448,12 +1448,14 @@ can be configured with the `-inmemoryDataFlushInterval` command-line flag (note
In-memory parts are persisted to disk into `part` directories under the `<-storageDataPath>/data/small/YYYY_MM/` folder,
where `YYYY_MM` is the month partition for the stored data. For example, `2022_11` is the partition for `parts`
with [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) from `November 2022`.
Each partition directory contains `parts.json` file with the actual list of parts in the partition.
The `part` directory has the following name pattern: `rowsCount_blocksCount_minTimestamp_maxTimestamp`, where:
Every `part` directory contains `metadata.json` file with the following fields:
- `rowsCount` - the number of [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) stored in the part
- `blocksCount` - the number of blocks stored in the part (see details about blocks below)
- `minTimestamp` and `maxTimestamp` - minimum and maximum timestamps across raw samples stored in the part
- `RowsCount` - the number of [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) stored in the part
- `BlocksCount` - the number of blocks stored in the part (see details about blocks below)
- `MinTimestamp` and `MaxTimestamp` - minimum and maximum timestamps across raw samples stored in the part
- `MinDedupInterval` - the [deduplication interval](#deduplication) applied to the given part.
Each `part` consists of `blocks` sorted by internal time series id (aka `TSID`).
Each `block` contains up to 8K [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples),
@ -1475,9 +1477,8 @@ for fast block lookups, which belong to the given `TSID` and cover the given tim
and [freeing up disk space for the deleted time series](#how-to-delete-time-series) are performed during the merge
Newly added `parts` either successfully appear in the storage or fail to appear.
The newly added `parts` are being created in a temporary directory under `<-storageDataPath>/data/{small,big}/YYYY_MM/tmp` folder.
When the newly added `part` is fully written and [fsynced](https://man7.org/linux/man-pages/man2/fsync.2.html)
to a temporary directory, then it is atomically moved to the storage directory.
The newly added `part` is atomically registered in the `parts.json` file under the corresponding partition
after it is fully written and [fsynced](https://man7.org/linux/man-pages/man2/fsync.2.html) to the storage.
Thanks to this alogrithm, storage never contains partially created parts, even if hardware power off
occurrs in the middle of writing the `part` to disk - such incompletely written `parts`
are automatically deleted on the next VictoriaMetrics start.
@ -1506,8 +1507,7 @@ Retention is configured with the `-retentionPeriod` command-line flag, which tak
Data is split in per-month partitions inside `<-storageDataPath>/data/{small,big}` folders.
Data partitions outside the configured retention are deleted on the first day of the new month.
Each partition consists of one or more data parts with the following name pattern `rowsCount_blocksCount_minTimestamp_maxTimestamp`.
Data parts outside of the configured retention are eventually deleted during
Each partition consists of one or more data parts. Data parts outside of the configured retention are eventually deleted during
[background merge](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282).
The maximum disk space usage for a given `-retentionPeriod` is going to be (`-retentionPeriod` + 1) months.

View file

@ -1451,12 +1451,14 @@ can be configured with the `-inmemoryDataFlushInterval` command-line flag (note
In-memory parts are persisted to disk into `part` directories under the `<-storageDataPath>/data/small/YYYY_MM/` folder,
where `YYYY_MM` is the month partition for the stored data. For example, `2022_11` is the partition for `parts`
with [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) from `November 2022`.
Each partition directory contains `parts.json` file with the actual list of parts in the partition.
The `part` directory has the following name pattern: `rowsCount_blocksCount_minTimestamp_maxTimestamp`, where:
Every `part` directory contains `metadata.json` file with the following fields:
- `rowsCount` - the number of [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) stored in the part
- `blocksCount` - the number of blocks stored in the part (see details about blocks below)
- `minTimestamp` and `maxTimestamp` - minimum and maximum timestamps across raw samples stored in the part
- `RowsCount` - the number of [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) stored in the part
- `BlocksCount` - the number of blocks stored in the part (see details about blocks below)
- `MinTimestamp` and `MaxTimestamp` - minimum and maximum timestamps across raw samples stored in the part
- `MinDedupInterval` - the [deduplication interval](#deduplication) applied to the given part.
Each `part` consists of `blocks` sorted by internal time series id (aka `TSID`).
Each `block` contains up to 8K [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples),
@ -1478,9 +1480,8 @@ for fast block lookups, which belong to the given `TSID` and cover the given tim
and [freeing up disk space for the deleted time series](#how-to-delete-time-series) are performed during the merge
Newly added `parts` either successfully appear in the storage or fail to appear.
The newly added `parts` are being created in a temporary directory under `<-storageDataPath>/data/{small,big}/YYYY_MM/tmp` folder.
When the newly added `part` is fully written and [fsynced](https://man7.org/linux/man-pages/man2/fsync.2.html)
to a temporary directory, then it is atomically moved to the storage directory.
The newly added `part` is atomically registered in the `parts.json` file under the corresponding partition
after it is fully written and [fsynced](https://man7.org/linux/man-pages/man2/fsync.2.html) to the storage.
Thanks to this alogrithm, storage never contains partially created parts, even if hardware power off
occurrs in the middle of writing the `part` to disk - such incompletely written `parts`
are automatically deleted on the next VictoriaMetrics start.
@ -1509,8 +1510,7 @@ Retention is configured with the `-retentionPeriod` command-line flag, which tak
Data is split in per-month partitions inside `<-storageDataPath>/data/{small,big}` folders.
Data partitions outside the configured retention are deleted on the first day of the new month.
Each partition consists of one or more data parts with the following name pattern `rowsCount_blocksCount_minTimestamp_maxTimestamp`.
Data parts outside of the configured retention are eventually deleted during
Each partition consists of one or more data parts. Data parts outside of the configured retention are eventually deleted during
[background merge](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282).
The maximum disk space usage for a given `-retentionPeriod` is going to be (`-retentionPeriod` + 1) months.

View file

@ -143,8 +143,8 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error {
path = filepath.Clean(path)
if err := bsr.ph.ParseFromPath(path); err != nil {
return fmt.Errorf("cannot parse partHeader data from %q: %w", path, err)
if err := bsr.ph.ReadMetadata(path); err != nil {
return fmt.Errorf("cannot read metadata from %q: %w", path, err)
}
metaindexPath := path + "/metaindex.bin"

View file

@ -2,7 +2,6 @@ package mergeset
import (
"fmt"
"path/filepath"
"sync"
"unsafe"
@ -68,11 +67,9 @@ type part struct {
}
func openFilePart(path string) (*part, error) {
path = filepath.Clean(path)
var ph partHeader
if err := ph.ParseFromPath(path); err != nil {
return nil, fmt.Errorf("cannot parse path to part: %w", err)
if err := ph.ReadMetadata(path); err != nil {
return nil, fmt.Errorf("cannot read part metadata: %w", err)
}
metaindexPath := path + "/metaindex.bin"

View file

@ -5,11 +5,9 @@ import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
type partHeader struct {
@ -79,50 +77,10 @@ func (ph *partHeader) CopyFrom(src *partHeader) {
ph.lastItem = append(ph.lastItem[:0], src.lastItem...)
}
func (ph *partHeader) ParseFromPath(partPath string) error {
func (ph *partHeader) ReadMetadata(partPath string) error {
ph.Reset()
partPath = filepath.Clean(partPath)
// Extract encoded part name.
n := strings.LastIndexByte(partPath, '/')
if n < 0 {
return fmt.Errorf("cannot find encoded part name in the path %q", partPath)
}
partName := partPath[n+1:]
// PartName must have the following form:
// itemsCount_blocksCount_Garbage
a := strings.Split(partName, "_")
if len(a) != 3 {
return fmt.Errorf("unexpected number of substrings in the part name %q: got %d; want %d", partName, len(a), 3)
}
// Read itemsCount from partName.
itemsCount, err := strconv.ParseUint(a[0], 10, 64)
if err != nil {
return fmt.Errorf("cannot parse itemsCount from partName %q: %w", partName, err)
}
ph.itemsCount = itemsCount
if ph.itemsCount <= 0 {
return fmt.Errorf("part %q cannot contain zero items", partPath)
}
// Read blocksCount from partName.
blocksCount, err := strconv.ParseUint(a[1], 10, 64)
if err != nil {
return fmt.Errorf("cannot parse blocksCount from partName %q: %w", partName, err)
}
ph.blocksCount = blocksCount
if ph.blocksCount <= 0 {
return fmt.Errorf("part %q cannot contain zero blocks", partPath)
}
if ph.blocksCount > ph.itemsCount {
return fmt.Errorf("the number of blocks cannot exceed the number of items in the part %q; got blocksCount=%d, itemsCount=%d",
partPath, ph.blocksCount, ph.itemsCount)
}
// Read other ph fields from metadata.
// Read ph fields from metadata.
metadataPath := partPath + "/metadata.json"
metadata, err := os.ReadFile(metadataPath)
if err != nil {
@ -133,12 +91,20 @@ func (ph *partHeader) ParseFromPath(partPath string) error {
if err := json.Unmarshal(metadata, &phj); err != nil {
return fmt.Errorf("cannot parse %q: %w", metadataPath, err)
}
if ph.itemsCount != phj.ItemsCount {
return fmt.Errorf("invalid ItemsCount in %q; got %d; want %d", metadataPath, phj.ItemsCount, ph.itemsCount)
if phj.ItemsCount <= 0 {
return fmt.Errorf("part %q cannot contain zero items", partPath)
}
if ph.blocksCount != phj.BlocksCount {
return fmt.Errorf("invalid BlocksCount in %q; got %d; want %d", metadataPath, phj.BlocksCount, ph.blocksCount)
ph.itemsCount = phj.ItemsCount
if phj.BlocksCount <= 0 {
return fmt.Errorf("part %q cannot contain zero blocks", partPath)
}
if phj.BlocksCount > phj.ItemsCount {
return fmt.Errorf("the number of blocks cannot exceed the number of items in the part %q; got blocksCount=%d, itemsCount=%d",
partPath, phj.BlocksCount, phj.ItemsCount)
}
ph.blocksCount = phj.BlocksCount
ph.firstItem = append(ph.firstItem[:0], phj.FirstItem...)
ph.lastItem = append(ph.lastItem[:0], phj.LastItem...)
@ -146,11 +112,6 @@ func (ph *partHeader) ParseFromPath(partPath string) error {
return nil
}
func (ph *partHeader) Path(tablePath string, suffix uint64) string {
tablePath = filepath.Clean(tablePath)
return fmt.Sprintf("%s/%d_%d_%016X", tablePath, ph.itemsCount, ph.blocksCount, suffix)
}
func (ph *partHeader) WriteMetadata(partPath string) error {
phj := &partHeaderJSON{
ItemsCount: ph.itemsCount,
@ -158,9 +119,9 @@ func (ph *partHeader) WriteMetadata(partPath string) error {
FirstItem: append([]byte{}, ph.firstItem...),
LastItem: append([]byte{}, ph.lastItem...),
}
metadata, err := json.MarshalIndent(&phj, "", "\t")
metadata, err := json.Marshal(&phj)
if err != nil {
return fmt.Errorf("cannot marshal metadata: %w", err)
logger.Panicf("BUG: cannot marshal partHeader metadata: %s", err)
}
metadataPath := partPath + "/metadata.json"
if err := fs.WriteFileAtomically(metadataPath, metadata, false); err != nil {

View file

@ -1,6 +1,7 @@
package mergeset
import (
"encoding/json"
"errors"
"fmt"
"os"
@ -12,7 +13,6 @@ import (
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
@ -141,8 +141,6 @@ type Table struct {
// which may need to be merged.
needMergeCh chan struct{}
snapshotLock sync.RWMutex
flockF *os.File
stopCh chan struct{}
@ -274,7 +272,9 @@ type partWrapper struct {
mp *inmemoryPart
refCount uint64
refCount uint32
mustBeDeleted uint32
isInMerge bool
@ -283,18 +283,22 @@ type partWrapper struct {
}
func (pw *partWrapper) incRef() {
atomic.AddUint64(&pw.refCount, 1)
atomic.AddUint32(&pw.refCount, 1)
}
func (pw *partWrapper) decRef() {
n := atomic.AddUint64(&pw.refCount, ^uint64(0))
if int64(n) < 0 {
logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", int64(n))
n := atomic.AddUint32(&pw.refCount, ^uint32(0))
if int32(n) < 0 {
logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", int32(n))
}
if n > 0 {
return
}
deletePath := ""
if pw.mp == nil && atomic.LoadUint32(&pw.mustBeDeleted) != 0 {
deletePath = pw.p.path
}
if pw.mp != nil {
// Do not return pw.mp to pool via putInmemoryPart(),
// since pw.mp size may be too big compared to other entries stored in the pool.
@ -303,6 +307,10 @@ func (pw *partWrapper) decRef() {
}
pw.p.MustClose()
pw.p = nil
if deletePath != "" {
fs.MustRemoveAll(deletePath)
}
}
// OpenTable opens a table on the given path.
@ -512,7 +520,7 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
m.InmemoryBlocksCount += p.ph.blocksCount
m.InmemoryItemsCount += p.ph.itemsCount
m.InmemorySizeBytes += p.size
m.PartsRefCount += atomic.LoadUint64(&pw.refCount)
m.PartsRefCount += uint64(atomic.LoadUint32(&pw.refCount))
}
m.FilePartsCount += uint64(len(tb.fileParts))
@ -521,7 +529,7 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
m.FileBlocksCount += p.ph.blocksCount
m.FileItemsCount += p.ph.itemsCount
m.FileSizeBytes += p.size
m.PartsRefCount += atomic.LoadUint64(&pw.refCount)
m.PartsRefCount += uint64(atomic.LoadUint32(&pw.refCount))
}
tb.partsLock.Unlock()
@ -1074,21 +1082,19 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal
// Initialize destination paths.
dstPartType := getDstPartType(pws, isFinal)
tmpPartPath, mergeIdx := tb.getDstPartPaths(dstPartType)
mergeIdx := tb.nextMergeIdx()
dstPartPath := ""
if dstPartType == partFile {
dstPartPath = fmt.Sprintf("%s/%016X", tb.path, mergeIdx)
}
if isFinal && len(pws) == 1 && pws[0].mp != nil {
// Fast path: flush a single in-memory part to disk.
mp := pws[0].mp
if tmpPartPath == "" {
logger.Panicf("BUG: tmpPartPath must be non-empty")
}
if err := mp.StoreToDisk(tmpPartPath); err != nil {
return fmt.Errorf("cannot store in-memory part to %q: %w", tmpPartPath, err)
}
pwNew, err := tb.openCreatedPart(&mp.ph, pws, nil, tmpPartPath, mergeIdx)
if err != nil {
return fmt.Errorf("cannot atomically register the created part: %w", err)
if err := mp.StoreToDisk(dstPartPath); err != nil {
logger.Panicf("FATAL: cannot store in-memory part to %s: %s", dstPartPath, err)
}
pwNew := tb.openCreatedPart(pws, nil, dstPartPath)
tb.swapSrcWithDstParts(pws, pwNew, dstPartType)
return nil
}
@ -1096,7 +1102,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal
// Prepare BlockStreamReaders for source parts.
bsrs, err := openBlockStreamReaders(pws)
if err != nil {
return err
logger.Panicf("FATAL: cannot open source parts for merging: %s", err)
}
closeBlockStreamReaders := func() {
for _, bsr := range bsrs {
@ -1121,45 +1127,30 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal
mpNew = &inmemoryPart{}
bsw.InitFromInmemoryPart(mpNew, compressLevel)
} else {
if tmpPartPath == "" {
logger.Panicf("BUG: tmpPartPath must be non-empty")
}
nocache := srcItemsCount > maxItemsPerCachedPart()
if err := bsw.InitFromFilePart(tmpPartPath, nocache, compressLevel); err != nil {
closeBlockStreamReaders()
return fmt.Errorf("cannot create destination part at %q: %w", tmpPartPath, err)
if err := bsw.InitFromFilePart(dstPartPath, nocache, compressLevel); err != nil {
logger.Panicf("FATAL: cannot create destination part at %s: %s", dstPartPath, err)
}
}
// Merge source parts to destination part.
ph, err := tb.mergePartsInternal(tmpPartPath, bsw, bsrs, dstPartType, stopCh)
ph, err := tb.mergePartsInternal(dstPartPath, bsw, bsrs, dstPartType, stopCh)
putBlockStreamWriter(bsw)
closeBlockStreamReaders()
if err != nil {
return fmt.Errorf("cannot merge %d parts: %w", len(pws), err)
return err
}
if mpNew != nil {
// Update partHeader for destination inmemory part after the merge.
mpNew.ph = *ph
}
// Atomically move the created part from tmpPartPath to its destination
// and swap the source parts with the newly created part.
pwNew, err := tb.openCreatedPart(ph, pws, mpNew, tmpPartPath, mergeIdx)
if err != nil {
return fmt.Errorf("cannot atomically register the created part: %w", err)
}
dstItemsCount := uint64(0)
dstBlocksCount := uint64(0)
dstSize := uint64(0)
dstPartPath := ""
if pwNew != nil {
// Atomically swap the source parts with the newly created part.
pwNew := tb.openCreatedPart(pws, mpNew, dstPartPath)
pDst := pwNew.p
dstItemsCount = pDst.ph.itemsCount
dstBlocksCount = pDst.ph.blocksCount
dstSize = pDst.size
dstPartPath = pDst.path
}
dstItemsCount := pDst.ph.itemsCount
dstBlocksCount := pDst.ph.blocksCount
dstSize := pDst.size
tb.swapSrcWithDstParts(pws, pwNew, dstPartType)
@ -1207,19 +1198,6 @@ func getDstPartType(pws []*partWrapper, isFinal bool) partType {
return partInmemory
}
func (tb *Table) getDstPartPaths(dstPartType partType) (string, uint64) {
tmpPartPath := ""
mergeIdx := tb.nextMergeIdx()
switch dstPartType {
case partInmemory:
case partFile:
tmpPartPath = fmt.Sprintf("%s/tmp/%016X", tb.path, mergeIdx)
default:
logger.Panicf("BUG: unknown partType=%d", dstPartType)
}
return tmpPartPath, mergeIdx
}
func openBlockStreamReaders(pws []*partWrapper) ([]*blockStreamReader, error) {
bsrs := make([]*blockStreamReader, 0, len(pws))
for _, pw := range pws {
@ -1239,7 +1217,7 @@ func openBlockStreamReaders(pws []*partWrapper) ([]*blockStreamReader, error) {
return bsrs, nil
}
func (tb *Table) mergePartsInternal(tmpPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) {
func (tb *Table) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) {
var ph partHeader
var itemsMerged *uint64
var mergesCount *uint64
@ -1261,56 +1239,34 @@ func (tb *Table) mergePartsInternal(tmpPartPath string, bsw *blockStreamWriter,
atomic.AddUint64(activeMerges, ^uint64(0))
atomic.AddUint64(mergesCount, 1)
if err != nil {
return nil, fmt.Errorf("cannot merge parts to %q: %w", tmpPartPath, err)
return nil, fmt.Errorf("cannot merge %d parts to %s: %w", len(bsrs), dstPartPath, err)
}
if tmpPartPath != "" {
if err := ph.WriteMetadata(tmpPartPath); err != nil {
return nil, fmt.Errorf("cannot write metadata to destination part %q: %w", tmpPartPath, err)
if dstPartPath != "" {
if err := ph.WriteMetadata(dstPartPath); err != nil {
logger.Panicf("FATAL: cannot write metadata to %s: %s", dstPartPath, err)
}
}
return &ph, nil
}
func (tb *Table) openCreatedPart(ph *partHeader, pws []*partWrapper, mpNew *inmemoryPart, tmpPartPath string, mergeIdx uint64) (*partWrapper, error) {
dstPartPath := ""
if mpNew == nil || !areAllInmemoryParts(pws) {
// Either source or destination parts are located on disk.
// Create a transaction for atomic deleting of old parts and moving new part to its destination on disk.
var bb bytesutil.ByteBuffer
for _, pw := range pws {
if pw.mp == nil {
fmt.Fprintf(&bb, "%s\n", pw.p.path)
}
}
dstPartPath = ph.Path(tb.path, mergeIdx)
fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath)
txnPath := fmt.Sprintf("%s/txn/%016X", tb.path, mergeIdx)
if err := fs.WriteFileAtomically(txnPath, bb.B, false); err != nil {
return nil, fmt.Errorf("cannot create transaction file %q: %w", txnPath, err)
}
// Run the created transaction.
if err := runTransaction(&tb.snapshotLock, tb.path, txnPath); err != nil {
return nil, fmt.Errorf("cannot execute transaction %q: %w", txnPath, err)
}
}
func (tb *Table) openCreatedPart(pws []*partWrapper, mpNew *inmemoryPart, dstPartPath string) *partWrapper {
// Open the created part.
if mpNew != nil {
// Open the created part from memory.
flushToDiskDeadline := getFlushToDiskDeadline(pws)
pwNew := newPartWrapperFromInmemoryPart(mpNew, flushToDiskDeadline)
return pwNew, nil
return pwNew
}
// Open the created part from disk.
pNew, err := openFilePart(dstPartPath)
if err != nil {
return nil, fmt.Errorf("cannot open merged part %q: %w", dstPartPath, err)
logger.Panicf("FATAL: cannot open the merged part: %s", err)
}
pwNew := &partWrapper{
p: pNew,
refCount: 1,
}
return pwNew, nil
return pwNew
}
func areAllInmemoryParts(pws []*partWrapper) bool {
@ -1335,9 +1291,9 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst
removedFileParts := 0
tb.partsLock.Lock()
tb.inmemoryParts, removedInmemoryParts = removeParts(tb.inmemoryParts, m)
tb.fileParts, removedFileParts = removeParts(tb.fileParts, m)
if pwNew != nil {
switch dstPartType {
case partInmemory:
tb.inmemoryParts = append(tb.inmemoryParts, pwNew)
@ -1347,7 +1303,14 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst
logger.Panicf("BUG: unknown partType=%d", dstPartType)
}
tb.notifyBackgroundMergers()
// Atomically store the updated list of file-based parts on disk.
// This must be performed under partsLock in order to prevent from races
// when multiple concurrently running goroutines update the list.
if removedFileParts > 0 || dstPartType == partFile {
mustWritePartNames(tb.fileParts, tb.path)
}
tb.partsLock.Unlock()
removedParts := removedInmemoryParts + removedFileParts
@ -1355,8 +1318,10 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst
logger.Panicf("BUG: unexpected number of parts removed; got %d, want %d", removedParts, len(m))
}
// Remove references from old parts.
// Mark old parts as must be deleted and decrement reference count,
// so they are eventually closed and deleted.
for _, pw := range pws {
atomic.StoreUint32(&pw.mustBeDeleted, 1)
pw.decRef()
}
}
@ -1412,50 +1377,40 @@ func openParts(path string) ([]*partWrapper, error) {
}
fs.MustRemoveTemporaryDirs(path)
// Run remaining transactions and cleanup /txn and /tmp directories.
// Snapshots cannot be created yet, so use fakeSnapshotLock.
var fakeSnapshotLock sync.RWMutex
if err := runTransactions(&fakeSnapshotLock, path); err != nil {
return nil, fmt.Errorf("cannot run transactions: %w", err)
}
// Remove txn and tmp directories, which may be left after the upgrade
// to v1.90.0 and newer versions.
fs.MustRemoveAll(path + "/txn")
fs.MustRemoveAll(path + "/tmp")
txnDir := path + "/txn"
fs.MustRemoveDirAtomic(txnDir)
if err := fs.MkdirAllFailIfExist(txnDir); err != nil {
return nil, fmt.Errorf("cannot create %q: %w", txnDir, err)
}
partNames := mustReadPartNames(path)
tmpDir := path + "/tmp"
fs.MustRemoveDirAtomic(tmpDir)
if err := fs.MkdirAllFailIfExist(tmpDir); err != nil {
return nil, fmt.Errorf("cannot create %q: %w", tmpDir, err)
}
fs.MustSyncPath(path)
// Open parts.
// Remove dirs missing in partNames. These dirs may be left after unclean shutdown
// or after the update from versions prior to v1.90.0.
des, err := os.ReadDir(path)
if err != nil {
return nil, fmt.Errorf("cannot read directory: %w", err)
return nil, fmt.Errorf("cannot read mergetree table dir: %w", err)
}
m := make(map[string]struct{}, len(partNames))
for _, partName := range partNames {
m[partName] = struct{}{}
}
var pws []*partWrapper
for _, de := range des {
if !fs.IsDirOrSymlink(de) {
// Skip non-directories.
continue
}
fn := de.Name()
if isSpecialDir(fn) {
// Skip special dirs.
continue
if _, ok := m[fn]; !ok {
deletePath := path + "/" + fn
fs.MustRemoveAll(deletePath)
}
partPath := path + "/" + fn
if fs.IsEmptyDir(partPath) {
// Remove empty directory, which can be left after unclean shutdown on NFS.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142
fs.MustRemoveDirAtomic(partPath)
continue
}
fs.MustSyncPath(path)
// Open parts
var pws []*partWrapper
for _, partName := range partNames {
partPath := path + "/" + partName
p, err := openFilePart(partPath)
if err != nil {
mustCloseParts(pws)
@ -1482,8 +1437,7 @@ func mustCloseParts(pws []*partWrapper) {
// CreateSnapshotAt creates tb snapshot in the given dstDir.
//
// Snapshot is created using linux hard links, so it is usually created
// very quickly.
// Snapshot is created using linux hard links, so it is usually created very quickly.
//
// If deadline is reached before snapshot is created error is returned.
//
@ -1509,36 +1463,27 @@ func (tb *Table) CreateSnapshotAt(dstDir string, deadline uint64) error {
// Flush inmemory items to disk.
tb.flushInmemoryItems()
// The snapshot must be created under the lock in order to prevent from
// concurrent modifications via runTransaction.
tb.snapshotLock.Lock()
defer tb.snapshotLock.Unlock()
if err := fs.MkdirAllFailIfExist(dstDir); err != nil {
return fmt.Errorf("cannot create snapshot dir %q: %w", dstDir, err)
}
des, err := os.ReadDir(srcDir)
if err != nil {
return fmt.Errorf("cannot read directory: %w", err)
}
pws := tb.getParts(nil)
defer tb.putParts(pws)
for _, de := range des {
// Create a file with part names at dstDir
mustWritePartNames(pws, dstDir)
// Make hardlinks for pws at dstDir
for _, pw := range pws {
if pw.mp != nil {
// Skip in-memory parts
continue
}
if deadline > 0 && fasttime.UnixTimestamp() > deadline {
return fmt.Errorf("cannot create snapshot for %q: timeout exceeded", tb.path)
}
fn := de.Name()
if !fs.IsDirOrSymlink(de) {
// Skip non-directories.
continue
}
if isSpecialDir(fn) {
// Skip special dirs.
continue
}
srcPartPath := srcDir + "/" + fn
dstPartPath := dstDir + "/" + fn
srcPartPath := pw.p.path
dstPartPath := dstDir + "/" + filepath.Base(srcPartPath)
if err := fs.HardLinkFiles(srcPartPath, dstPartPath); err != nil {
return fmt.Errorf("cannot create hard links from %q to %q: %w", srcPartPath, dstPartPath, err)
}
@ -1552,129 +1497,60 @@ func (tb *Table) CreateSnapshotAt(dstDir string, deadline uint64) error {
return nil
}
func runTransactions(txnLock *sync.RWMutex, path string) error {
// Wait until all the previous pending transaction deletions are finished.
pendingTxnDeletionsWG.Wait()
// Make sure all the current transaction deletions are finished before exiting.
defer pendingTxnDeletionsWG.Wait()
txnDir := path + "/txn"
des, err := os.ReadDir(txnDir)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("cannot read transaction dir: %w", err)
}
// Sort transaction files by id, since transactions must be ordered.
sort.Slice(des, func(i, j int) bool {
return des[i].Name() < des[j].Name()
})
for _, de := range des {
fn := de.Name()
if fs.IsTemporaryFileName(fn) {
// Skip temporary files, which could be left after unclean shutdown.
func mustWritePartNames(pws []*partWrapper, dstDir string) {
partNames := make([]string, 0, len(pws))
for _, pw := range pws {
if pw.mp != nil {
// Skip in-memory parts
continue
}
txnPath := txnDir + "/" + fn
if err := runTransaction(txnLock, path, txnPath); err != nil {
return fmt.Errorf("cannot run transaction from %q: %w", txnPath, err)
partName := filepath.Base(pw.p.path)
partNames = append(partNames, partName)
}
sort.Strings(partNames)
data, err := json.Marshal(partNames)
if err != nil {
logger.Panicf("BUG: cannot marshal partNames to JSON: %s", err)
}
partNamesPath := dstDir + "/parts.json"
if err := fs.WriteFileAtomically(partNamesPath, data, true); err != nil {
logger.Panicf("FATAL: cannot update %s: %s", partNamesPath, err)
}
return nil
}
func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error {
// The transaction must run under read lock in order to provide
// consistent snapshots with Table.CreateSnapshot().
txnLock.RLock()
defer txnLock.RUnlock()
data, err := os.ReadFile(txnPath)
func mustReadPartNames(srcDir string) []string {
partNamesPath := srcDir + "/parts.json"
data, err := os.ReadFile(partNamesPath)
if err == nil {
var partNames []string
if err := json.Unmarshal(data, &partNames); err != nil {
logger.Panicf("FATAL: cannot parse %s: %s", partNamesPath, err)
}
return partNames
}
if !os.IsNotExist(err) {
logger.Panicf("FATAL: cannot read parts.json file: %s", err)
}
// The parts.json is missing. This is the upgrade from versions previous to v1.90.0.
// Read part names from directories under srcDir
des, err := os.ReadDir(srcDir)
if err != nil {
return fmt.Errorf("cannot read transaction file: %w", err)
logger.Panicf("FATAL: cannot read mergeset table dir: %s", err)
}
if len(data) > 0 && data[len(data)-1] == '\n' {
data = data[:len(data)-1]
var partNames []string
for _, de := range des {
if !fs.IsDirOrSymlink(de) {
// Skip non-directories.
continue
}
paths := strings.Split(string(data), "\n")
if len(paths) == 0 {
return fmt.Errorf("empty transaction")
partName := de.Name()
if isSpecialDir(partName) {
// Skip special dirs.
continue
}
rmPaths := paths[:len(paths)-1]
mvPaths := strings.Split(paths[len(paths)-1], " -> ")
if len(mvPaths) != 2 {
return fmt.Errorf("invalid last line in the transaction file: got %q; must contain `srcPath -> dstPath`", paths[len(paths)-1])
partNames = append(partNames, partName)
}
// Remove old paths. It is OK if certain paths don't exist.
for _, path := range rmPaths {
path, err := validatePath(pathPrefix, path)
if err != nil {
return fmt.Errorf("invalid path to remove: %w", err)
}
fs.MustRemoveDirAtomic(path)
}
// Move the new part to new directory.
srcPath := mvPaths[0]
dstPath := mvPaths[1]
srcPath, err = validatePath(pathPrefix, srcPath)
if err != nil {
return fmt.Errorf("invalid source path to rename: %w", err)
}
dstPath, err = validatePath(pathPrefix, dstPath)
if err != nil {
return fmt.Errorf("invalid destination path to rename: %w", err)
}
if fs.IsPathExist(srcPath) {
if err := os.Rename(srcPath, dstPath); err != nil {
return fmt.Errorf("cannot rename %q to %q: %w", srcPath, dstPath, err)
}
} else if !fs.IsPathExist(dstPath) {
// Emit info message for the expected condition after unclean shutdown on NFS disk.
// The dstPath part may be missing because it could be already merged into bigger part
// while old source parts for the current txn weren't still deleted due to NFS locks.
logger.Infof("cannot find both source and destination paths: %q -> %q; this may be the case after unclean shutdown (OOM, `kill -9`, hard reset) on NFS disk",
srcPath, dstPath)
}
// Flush pathPrefix directory metadata to the underlying storage.
fs.MustSyncPath(pathPrefix)
pendingTxnDeletionsWG.Add(1)
go func() {
defer pendingTxnDeletionsWG.Done()
if err := os.Remove(txnPath); err != nil {
logger.Errorf("cannot remove transaction file %q: %s", txnPath, err)
}
}()
return nil
}
var pendingTxnDeletionsWG syncwg.WaitGroup
func validatePath(pathPrefix, path string) (string, error) {
var err error
pathPrefix, err = filepath.Abs(pathPrefix)
if err != nil {
return path, fmt.Errorf("cannot determine absolute path for pathPrefix=%q: %w", pathPrefix, err)
}
path, err = filepath.Abs(path)
if err != nil {
return path, fmt.Errorf("cannot determine absolute path for %q: %w", path, err)
}
if !strings.HasPrefix(path, pathPrefix+"/") {
return path, fmt.Errorf("invalid path %q; must start with %q", path, pathPrefix+"/")
}
return path, nil
return partNames
}
// getPartsToMerge returns optimal parts to merge from pws.

View file

@ -141,7 +141,7 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error {
path = filepath.Clean(path)
if err := bsr.ph.ParseFromPath(path); err != nil {
if err := bsr.ph.ReadMetadata(path); err != nil {
return fmt.Errorf("cannot parse path to part: %w", err)
}

View file

@ -56,8 +56,8 @@ func (mp *inmemoryPart) StoreToDisk(path string) error {
if err := fs.WriteFileAndSync(metaindexPath, mp.metaindexData.B); err != nil {
return fmt.Errorf("cannot store metaindex: %w", err)
}
if err := mp.ph.writeMinDedupInterval(path); err != nil {
return fmt.Errorf("cannot store min dedup interval: %w", err)
if err := mp.ph.WriteMetadata(path); err != nil {
return fmt.Errorf("cannot store metadata: %w", err)
}
// Sync parent directory in order to make sure the written files remain visible after hardware reset
parentDirPath := filepath.Dir(path)

View file

@ -50,7 +50,7 @@ func openFilePart(path string) (*part, error) {
path = filepath.Clean(path)
var ph partHeader
if err := ph.ParseFromPath(path); err != nil {
if err := ph.ReadMetadata(path); err != nil {
return nil, fmt.Errorf("cannot parse path to part: %w", err)
}

View file

@ -1,6 +1,7 @@
package storage
import (
"encoding/json"
"errors"
"fmt"
"os"
@ -10,6 +11,7 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
)
@ -33,12 +35,35 @@ type partHeader struct {
// String returns string representation of ph.
func (ph *partHeader) String() string {
return fmt.Sprintf("%d_%d_%s_%s", ph.RowsCount, ph.BlocksCount, toUserReadableTimestamp(ph.MinTimestamp), toUserReadableTimestamp(ph.MaxTimestamp))
return fmt.Sprintf("partHeader{rowsCount=%d,blocksCount=%d,minTimestamp=%d,maxTimestamp=%d}", ph.RowsCount, ph.BlocksCount, ph.MinTimestamp, ph.MaxTimestamp)
}
func toUserReadableTimestamp(timestamp int64) string {
t := timestampToTime(timestamp)
return t.Format(userReadableTimeFormat)
// Reset resets the ph.
func (ph *partHeader) Reset() {
ph.RowsCount = 0
ph.BlocksCount = 0
ph.MinTimestamp = (1 << 63) - 1
ph.MaxTimestamp = -1 << 63
ph.MinDedupInterval = 0
}
func (ph *partHeader) readMinDedupInterval(partPath string) error {
filePath := partPath + "/min_dedup_interval"
data, err := os.ReadFile(filePath)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
// The minimum dedup interval may not exist for old parts.
ph.MinDedupInterval = 0
return nil
}
return fmt.Errorf("cannot read %q: %w", filePath, err)
}
dedupInterval, err := promutils.ParseDuration(string(data))
if err != nil {
return fmt.Errorf("cannot parse minimum dedup interval %q at %q: %w", data, filePath, err)
}
ph.MinDedupInterval = dedupInterval.Milliseconds()
return nil
}
func fromUserReadableTimestamp(s string) (int64, error) {
@ -51,15 +76,6 @@ func fromUserReadableTimestamp(s string) (int64, error) {
const userReadableTimeFormat = "20060102150405.000"
// Path returns a path to part header with the given prefix and suffix.
//
// Suffix must be random.
func (ph *partHeader) Path(prefix string, suffix uint64) string {
prefix = filepath.Clean(prefix)
s := ph.String()
return fmt.Sprintf("%s/%s_%016X", prefix, s, suffix)
}
// ParseFromPath extracts ph info from the given path.
func (ph *partHeader) ParseFromPath(path string) error {
ph.Reset()
@ -119,40 +135,48 @@ func (ph *partHeader) ParseFromPath(path string) error {
return nil
}
// Reset resets the ph.
func (ph *partHeader) Reset() {
ph.RowsCount = 0
ph.BlocksCount = 0
ph.MinTimestamp = (1 << 63) - 1
ph.MaxTimestamp = -1 << 63
ph.MinDedupInterval = 0
}
func (ph *partHeader) ReadMetadata(partPath string) error {
ph.Reset()
func (ph *partHeader) readMinDedupInterval(partPath string) error {
filePath := partPath + "/min_dedup_interval"
data, err := os.ReadFile(filePath)
metadataPath := partPath + "/metadata.json"
metadata, err := os.ReadFile(metadataPath)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
// The minimum dedup interval may not exist for old parts.
ph.MinDedupInterval = 0
return nil
if os.IsNotExist(err) {
// This is a part created before v1.90.0.
// Fall back to reading the metadata from the partPath itsel
return ph.ParseFromPath(partPath)
}
return fmt.Errorf("cannot read %q: %w", filePath, err)
return fmt.Errorf("cannot read %q: %w", metadataPath, err)
}
dedupInterval, err := promutils.ParseDuration(string(data))
if err != nil {
return fmt.Errorf("cannot parse minimum dedup interval %q at %q: %w", data, filePath, err)
if err := json.Unmarshal(metadata, ph); err != nil {
return fmt.Errorf("cannot parse %q: %w", metadataPath, err)
}
ph.MinDedupInterval = dedupInterval.Milliseconds()
// Perform various checks
if ph.MinTimestamp > ph.MaxTimestamp {
return fmt.Errorf("minTimestamp cannot exceed maxTimestamp; got %d vs %d", ph.MinTimestamp, ph.MaxTimestamp)
}
if ph.RowsCount <= 0 {
return fmt.Errorf("rowsCount must be greater than 0; got %d", ph.RowsCount)
}
if ph.BlocksCount <= 0 {
return fmt.Errorf("blocksCount must be greater than 0; got %d", ph.BlocksCount)
}
if ph.BlocksCount > ph.RowsCount {
return fmt.Errorf("blocksCount cannot be bigger than rowsCount; got blocksCount=%d, rowsCount=%d", ph.BlocksCount, ph.RowsCount)
}
return nil
}
func (ph *partHeader) writeMinDedupInterval(partPath string) error {
filePath := partPath + "/min_dedup_interval"
dedupInterval := time.Duration(ph.MinDedupInterval) * time.Millisecond
data := dedupInterval.String()
if err := fs.WriteFileAtomically(filePath, []byte(data), false); err != nil {
return fmt.Errorf("cannot create %q: %w", filePath, err)
func (ph *partHeader) WriteMetadata(partPath string) error {
metadata, err := json.Marshal(ph)
if err != nil {
logger.Panicf("BUG: cannot marshal partHeader metadata: %s", err)
}
metadataPath := partPath + "/metadata.json"
if err := fs.WriteFileAtomically(metadataPath, metadata, false); err != nil {
return fmt.Errorf("cannot create %q: %w", metadataPath, err)
}
return nil
}

View file

@ -1,62 +0,0 @@
package storage
import (
"testing"
)
func TestPartHeaderParseFromPath(t *testing.T) {
testParseFromPathError := func(path string) {
t.Helper()
var ph partHeader
if err := ph.ParseFromPath(path); err == nil {
t.Fatalf("expecting non-nil error")
}
}
t.Run("Error", func(t *testing.T) {
testParseFromPathError("")
testParseFromPathError("foobar")
testParseFromPathError("/foo/bar")
testParseFromPathError("/rowscount_mintimestamp_maxtimestamp_garbage")
testParseFromPathError("/rowscount_mintimestamp_maxtimestamp_garbage")
testParseFromPathError("/12_3456_mintimestamp_maxtimestamp_garbage")
testParseFromPathError("/12_3456_20181011010203.456_maxtimestamp_garbage")
testParseFromPathError("/12_3456_20181011010203.456_20181011010202.456_garbage")
testParseFromPathError("12_3456_20181011010203.456_20181011010203.457_garbage")
testParseFromPathError("12_3456_20181011010203.456_20181011010203.457_garbage/")
// MinTimestamp > MaxTimetamp
testParseFromPathError("1233_456_20181011010203.456_20181011010202.457_garbage")
// Zero rowsCount
testParseFromPathError("0_123_20181011010203.456_20181011010203.457_garbage")
// Zero blocksCount
testParseFromPathError("123_0_20181011010203.456_20181011010203.457_garbage")
// blocksCount > rowsCount
testParseFromPathError("123_456_20181011010203.456_20181011010203.457_garbage")
})
testParseFromPathSuccess := func(path string, phStringExpected string) {
t.Helper()
var ph partHeader
if err := ph.ParseFromPath(path); err != nil {
t.Fatalf("unexpected error when parsing path %q: %s", path, err)
}
phString := ph.String()
if phString != phStringExpected {
t.Fatalf("unexpected partHeader string for path %q: got %q; want %q", path, phString, phStringExpected)
}
}
t.Run("Success", func(t *testing.T) {
testParseFromPathSuccess("/1233_456_20181011010203.456_20181011010203.457_garbage", "1233_456_20181011010203.456_20181011010203.457")
testParseFromPathSuccess("/1233_456_20181011010203.456_20181011010203.457_garbage/", "1233_456_20181011010203.456_20181011010203.457")
testParseFromPathSuccess("/1233_456_20181011010203.456_20181011010203.457_garbage///", "1233_456_20181011010203.456_20181011010203.457")
testParseFromPathSuccess("/var/lib/tsdb/1233_456_20181011010203.456_20181011010203.457_garbage///", "1233_456_20181011010203.456_20181011010203.457")
testParseFromPathSuccess("/var/lib/tsdb/456_456_20181011010203.456_20181011010203.457_232345///", "456_456_20181011010203.456_20181011010203.457")
})
}

View file

@ -1,6 +1,7 @@
package storage
import (
"encoding/json"
"errors"
"fmt"
"os"
@ -12,7 +13,6 @@ import (
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -20,7 +20,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
)
// The maximum size of big part.
@ -158,8 +157,6 @@ type partition struct {
// which may need to be merged.
needMergeCh chan struct{}
snapshotLock sync.RWMutex
stopCh chan struct{}
wg sync.WaitGroup
@ -167,11 +164,11 @@ type partition struct {
// partWrapper is a wrapper for the part.
type partWrapper struct {
// Put atomic counters to the top of struct, so they are aligned to 8 bytes on 32-bit arch.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
// The number of references to the part.
refCount uint64
refCount uint32
// The flag, which is set when the part must be deleted after refCount reaches zero.
mustBeDeleted uint32
// The part itself.
p *part
@ -187,24 +184,32 @@ type partWrapper struct {
}
func (pw *partWrapper) incRef() {
atomic.AddUint64(&pw.refCount, 1)
atomic.AddUint32(&pw.refCount, 1)
}
func (pw *partWrapper) decRef() {
n := atomic.AddUint64(&pw.refCount, ^uint64(0))
if int64(n) < 0 {
logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", int64(n))
n := atomic.AddUint32(&pw.refCount, ^uint32(0))
if int32(n) < 0 {
logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", int32(n))
}
if n > 0 {
return
}
deletePath := ""
if pw.mp == nil && atomic.LoadUint32(&pw.mustBeDeleted) != 0 {
deletePath = pw.p.path
}
if pw.mp != nil {
putInmemoryPart(pw.mp)
pw.mp = nil
}
pw.p.MustClose()
pw.p = nil
if deletePath != "" {
fs.MustRemoveAll(deletePath)
}
}
// createPartition creates new partition for the given timestamp and the given paths
@ -215,11 +220,11 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str
bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name
logger.Infof("creating a partition %q with smallPartsPath=%q, bigPartsPath=%q", name, smallPartsPath, bigPartsPath)
if err := createPartitionDirs(smallPartsPath); err != nil {
return nil, fmt.Errorf("cannot create directories for small parts %q: %w", smallPartsPath, err)
if err := fs.MkdirAllFailIfExist(smallPartsPath); err != nil {
return nil, fmt.Errorf("cannot create directory for small parts %q: %w", smallPartsPath, err)
}
if err := createPartitionDirs(bigPartsPath); err != nil {
return nil, fmt.Errorf("cannot create directories for big parts %q: %w", bigPartsPath, err)
if err := fs.MkdirAllFailIfExist(bigPartsPath); err != nil {
return nil, fmt.Errorf("cannot create directory for big parts %q: %w", bigPartsPath, err)
}
pt := newPartition(name, smallPartsPath, bigPartsPath, s)
@ -243,8 +248,6 @@ func (pt *partition) startBackgroundWorkers() {
// The pt must be detached from table before calling pt.Drop.
func (pt *partition) Drop() {
logger.Infof("dropping partition %q at smallPartsPath=%q, bigPartsPath=%q", pt.name, pt.smallPartsPath, pt.bigPartsPath)
// Wait until all the pending transaction deletions are finished before removing partition directories.
pendingTxnDeletionsWG.Wait()
fs.MustRemoveDirAtomic(pt.smallPartsPath)
fs.MustRemoveDirAtomic(pt.bigPartsPath)
@ -266,11 +269,13 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition,
return nil, fmt.Errorf("patititon name in bigPartsPath %q doesn't match smallPartsPath %q; want %q", bigPartsPath, smallPartsPath, name)
}
smallParts, err := openParts(smallPartsPath, bigPartsPath, smallPartsPath)
partNamesSmall, partNamesBig := mustReadPartNames(smallPartsPath, bigPartsPath)
smallParts, err := openParts(smallPartsPath, partNamesSmall)
if err != nil {
return nil, fmt.Errorf("cannot open small parts from %q: %w", smallPartsPath, err)
}
bigParts, err := openParts(smallPartsPath, bigPartsPath, bigPartsPath)
bigParts, err := openParts(bigPartsPath, partNamesBig)
if err != nil {
mustCloseParts(smallParts)
return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err)
@ -375,21 +380,21 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
m.InmemoryRowsCount += p.ph.RowsCount
m.InmemoryBlocksCount += p.ph.BlocksCount
m.InmemorySizeBytes += p.size
m.InmemoryPartsRefCount += atomic.LoadUint64(&pw.refCount)
m.InmemoryPartsRefCount += uint64(atomic.LoadUint32(&pw.refCount))
}
for _, pw := range pt.smallParts {
p := pw.p
m.SmallRowsCount += p.ph.RowsCount
m.SmallBlocksCount += p.ph.BlocksCount
m.SmallSizeBytes += p.size
m.SmallPartsRefCount += atomic.LoadUint64(&pw.refCount)
m.SmallPartsRefCount += uint64(atomic.LoadUint32(&pw.refCount))
}
for _, pw := range pt.bigParts {
p := pw.p
m.BigRowsCount += p.ph.RowsCount
m.BigBlocksCount += p.ph.BlocksCount
m.BigSizeBytes += p.size
m.BigPartsRefCount += atomic.LoadUint64(&pw.refCount)
m.BigPartsRefCount += uint64(atomic.LoadUint32(&pw.refCount))
}
m.InmemoryPartsCount += uint64(len(pt.inmemoryParts))
@ -751,19 +756,12 @@ func (pt *partition) HasTimestamp(timestamp int64) bool {
func (pt *partition) GetParts(dst []*partWrapper, addInMemory bool) []*partWrapper {
pt.partsLock.Lock()
if addInMemory {
for _, pw := range pt.inmemoryParts {
pw.incRef()
}
incRefForParts(pt.inmemoryParts)
dst = append(dst, pt.inmemoryParts...)
}
for _, pw := range pt.smallParts {
pw.incRef()
}
incRefForParts(pt.smallParts)
dst = append(dst, pt.smallParts...)
for _, pw := range pt.bigParts {
pw.incRef()
}
incRefForParts(pt.bigParts)
dst = append(dst, pt.bigParts...)
pt.partsLock.Unlock()
@ -777,15 +775,18 @@ func (pt *partition) PutParts(pws []*partWrapper) {
}
}
func incRefForParts(pws []*partWrapper) {
for _, pw := range pws {
pw.incRef()
}
}
// MustClose closes the pt, so the app may safely exit.
//
// The pt must be detached from table before calling pt.MustClose.
func (pt *partition) MustClose() {
close(pt.stopCh)
// Wait until all the pending transaction deletions are finished.
pendingTxnDeletionsWG.Wait()
logger.Infof("waiting for service workers to stop on %q...", pt.smallPartsPath)
startTime := time.Now()
pt.wg.Wait()
@ -1277,7 +1278,8 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
// Initialize destination paths.
dstPartType := pt.getDstPartType(pws, isFinal)
ptPath, tmpPartPath, mergeIdx := pt.getDstPartPaths(dstPartType)
mergeIdx := pt.nextMergeIdx()
dstPartPath := pt.getDstPartPath(dstPartType, mergeIdx)
if dstPartType == partBig {
bigMergeWorkersLimitCh <- struct{}{}
@ -1289,17 +1291,10 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
if !isDedupEnabled() && isFinal && len(pws) == 1 && pws[0].mp != nil {
// Fast path: flush a single in-memory part to disk.
mp := pws[0].mp
if tmpPartPath == "" {
logger.Panicf("BUG: tmpPartPath must be non-empty")
}
if err := mp.StoreToDisk(tmpPartPath); err != nil {
return fmt.Errorf("cannot store in-memory part to %q: %w", tmpPartPath, err)
}
pwNew, err := pt.openCreatedPart(&mp.ph, pws, nil, ptPath, tmpPartPath, mergeIdx)
if err != nil {
return fmt.Errorf("cannot atomically register the created part: %w", err)
if err := mp.StoreToDisk(dstPartPath); err != nil {
logger.Panicf("FATAL: cannot store in-memory part to %s: %s", dstPartPath, err)
}
pwNew := pt.openCreatedPart(&mp.ph, pws, nil, dstPartPath)
pt.swapSrcWithDstParts(pws, pwNew, dstPartType)
return nil
}
@ -1307,7 +1302,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
// Prepare BlockStreamReaders for source parts.
bsrs, err := openBlockStreamReaders(pws)
if err != nil {
return err
logger.Panicf("FATAL: cannot open source parts for merging: %s", err)
}
closeBlockStreamReaders := func() {
for _, bsr := range bsrs {
@ -1333,45 +1328,38 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
mpNew = getInmemoryPart()
bsw.InitFromInmemoryPart(mpNew, compressLevel)
} else {
if tmpPartPath == "" {
logger.Panicf("BUG: tmpPartPath must be non-empty")
if dstPartPath == "" {
logger.Panicf("BUG: dstPartPath must be non-empty")
}
nocache := dstPartType == partBig
if err := bsw.InitFromFilePart(tmpPartPath, nocache, compressLevel); err != nil {
closeBlockStreamReaders()
return fmt.Errorf("cannot create destination part at %q: %w", tmpPartPath, err)
if err := bsw.InitFromFilePart(dstPartPath, nocache, compressLevel); err != nil {
logger.Panicf("FATAL: cannot create destination part at %s: %s", dstPartPath, err)
}
}
// Merge source parts to destination part.
ph, err := pt.mergePartsInternal(tmpPartPath, bsw, bsrs, dstPartType, stopCh)
ph, err := pt.mergePartsInternal(dstPartPath, bsw, bsrs, dstPartType, stopCh)
putBlockStreamWriter(bsw)
closeBlockStreamReaders()
if err != nil {
return fmt.Errorf("cannot merge %d parts: %w", len(pws), err)
return err
}
if mpNew != nil {
// Update partHeader for destination inmemory part after the merge.
mpNew.ph = *ph
}
// Atomically move the created part from tmpPartPath to its destination
// and swap the source parts with the newly created part.
pwNew, err := pt.openCreatedPart(ph, pws, mpNew, ptPath, tmpPartPath, mergeIdx)
if err != nil {
return fmt.Errorf("cannot atomically register the created part: %w", err)
}
// Atomically swap the source parts with the newly created part.
pwNew := pt.openCreatedPart(ph, pws, mpNew, dstPartPath)
dstRowsCount := uint64(0)
dstBlocksCount := uint64(0)
dstSize := uint64(0)
dstPartPath := ""
if pwNew != nil {
pDst := pwNew.p
dstRowsCount = pDst.ph.RowsCount
dstBlocksCount = pDst.ph.BlocksCount
dstSize = pDst.size
dstPartPath = pDst.String()
}
pt.swapSrcWithDstParts(pws, pwNew, dstPartType)
@ -1424,7 +1412,7 @@ func (pt *partition) getDstPartType(pws []*partWrapper, isFinal bool) partType {
return partInmemory
}
func (pt *partition) getDstPartPaths(dstPartType partType) (string, string, uint64) {
func (pt *partition) getDstPartPath(dstPartType partType, mergeIdx uint64) string {
ptPath := ""
switch dstPartType {
case partSmall:
@ -1436,13 +1424,11 @@ func (pt *partition) getDstPartPaths(dstPartType partType) (string, string, uint
default:
logger.Panicf("BUG: unknown partType=%d", dstPartType)
}
ptPath = filepath.Clean(ptPath)
mergeIdx := pt.nextMergeIdx()
tmpPartPath := ""
dstPartPath := ""
if dstPartType != partInmemory {
tmpPartPath = fmt.Sprintf("%s/tmp/%016X", ptPath, mergeIdx)
dstPartPath = fmt.Sprintf("%s/%016X", ptPath, mergeIdx)
}
return ptPath, tmpPartPath, mergeIdx
return dstPartPath
}
func openBlockStreamReaders(pws []*partWrapper) ([]*blockStreamReader, error) {
@ -1464,7 +1450,7 @@ func openBlockStreamReaders(pws []*partWrapper) ([]*blockStreamReader, error) {
return bsrs, nil
}
func (pt *partition) mergePartsInternal(tmpPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) {
func (pt *partition) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) {
var ph partHeader
var rowsMerged *uint64
var rowsDeleted *uint64
@ -1495,64 +1481,42 @@ func (pt *partition) mergePartsInternal(tmpPartPath string, bsw *blockStreamWrit
atomic.AddUint64(activeMerges, ^uint64(0))
atomic.AddUint64(mergesCount, 1)
if err != nil {
return nil, fmt.Errorf("cannot merge parts to %q: %w", tmpPartPath, err)
return nil, fmt.Errorf("cannot merge %d parts to %s: %w", len(bsrs), dstPartPath, err)
}
if tmpPartPath != "" {
if dstPartPath != "" {
ph.MinDedupInterval = GetDedupInterval()
if err := ph.writeMinDedupInterval(tmpPartPath); err != nil {
return nil, fmt.Errorf("cannot store min dedup interval: %w", err)
if err := ph.WriteMetadata(dstPartPath); err != nil {
logger.Panicf("FATAL: cannot store metadata to %s: %s", dstPartPath, err)
}
}
return &ph, nil
}
func (pt *partition) openCreatedPart(ph *partHeader, pws []*partWrapper, mpNew *inmemoryPart, ptPath, tmpPartPath string, mergeIdx uint64) (*partWrapper, error) {
dstPartPath := ""
if mpNew == nil || !areAllInmemoryParts(pws) {
// Either source or destination parts are located on disk.
// Create a transaction for atomic deleting of old parts and moving new part to its destination on disk.
var bb bytesutil.ByteBuffer
for _, pw := range pws {
if pw.mp == nil {
fmt.Fprintf(&bb, "%s\n", pw.p.path)
}
}
if ph.RowsCount > 0 {
// The destination part may have no rows if they are deleted during the merge.
dstPartPath = ph.Path(ptPath, mergeIdx)
}
fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath)
txnPath := fmt.Sprintf("%s/txn/%016X", ptPath, mergeIdx)
if err := fs.WriteFileAtomically(txnPath, bb.B, false); err != nil {
return nil, fmt.Errorf("cannot create transaction file %q: %w", txnPath, err)
}
// Run the created transaction.
if err := runTransaction(&pt.snapshotLock, pt.smallPartsPath, pt.bigPartsPath, txnPath); err != nil {
return nil, fmt.Errorf("cannot execute transaction %q: %w", txnPath, err)
}
}
func (pt *partition) openCreatedPart(ph *partHeader, pws []*partWrapper, mpNew *inmemoryPart, dstPartPath string) *partWrapper {
// Open the created part.
if ph.RowsCount == 0 {
// The created part is empty.
return nil, nil
// The created part is empty. Remove it
if mpNew == nil {
fs.MustRemoveAll(dstPartPath)
}
return nil
}
if mpNew != nil {
// Open the created part from memory.
flushToDiskDeadline := getFlushToDiskDeadline(pws)
pwNew := newPartWrapperFromInmemoryPart(mpNew, flushToDiskDeadline)
return pwNew, nil
return pwNew
}
// Open the created part from disk.
pNew, err := openFilePart(dstPartPath)
if err != nil {
return nil, fmt.Errorf("cannot open merged part %q: %w", dstPartPath, err)
logger.Panicf("FATAL: cannot open merged part %s: %s", dstPartPath, err)
}
pwNew := &partWrapper{
p: pNew,
refCount: 1,
}
return pwNew, nil
return pwNew
}
func areAllInmemoryParts(pws []*partWrapper) bool {
@ -1578,6 +1542,7 @@ func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper,
removedBigParts := 0
pt.partsLock.Lock()
pt.inmemoryParts, removedInmemoryParts = removeParts(pt.inmemoryParts, m)
pt.smallParts, removedSmallParts = removeParts(pt.smallParts, m)
pt.bigParts, removedBigParts = removeParts(pt.bigParts, m)
@ -1594,6 +1559,14 @@ func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper,
}
pt.notifyBackgroundMergers()
}
// Atomically store the updated list of file-based parts on disk.
// This must be performed under partsLock in order to prevent from races
// when multiple concurrently running goroutines update the list.
if removedSmallParts > 0 || removedBigParts > 0 || pwNew != nil && (dstPartType == partSmall || dstPartType == partBig) {
mustWritePartNames(pt.smallParts, pt.bigParts, pt.smallPartsPath)
}
pt.partsLock.Unlock()
removedParts := removedInmemoryParts + removedSmallParts + removedBigParts
@ -1601,8 +1574,10 @@ func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper,
logger.Panicf("BUG: unexpected number of parts removed; got %d, want %d", removedParts, len(m))
}
// Remove partition references from old parts.
// Mark old parts as must be deleted and decrement reference count,
// so they are eventually closed and deleted.
for _, pw := range pws {
atomic.StoreUint32(&pw.mustBeDeleted, 1)
pw.decRef()
}
}
@ -1672,62 +1647,35 @@ func (pt *partition) stalePartsRemover() {
}
func (pt *partition) removeStaleParts() {
m := make(map[*partWrapper]bool)
startTime := time.Now()
retentionDeadline := timestampFromTime(startTime) - pt.s.retentionMsecs
var pws []*partWrapper
pt.partsLock.Lock()
for _, pw := range pt.inmemoryParts {
if !pw.isInMerge && pw.p.ph.MaxTimestamp < retentionDeadline {
atomic.AddUint64(&pt.inmemoryRowsDeleted, pw.p.ph.RowsCount)
m[pw] = true
pw.isInMerge = true
pws = append(pws, pw)
}
}
for _, pw := range pt.smallParts {
if !pw.isInMerge && pw.p.ph.MaxTimestamp < retentionDeadline {
atomic.AddUint64(&pt.smallRowsDeleted, pw.p.ph.RowsCount)
m[pw] = true
pw.isInMerge = true
pws = append(pws, pw)
}
}
for _, pw := range pt.bigParts {
if !pw.isInMerge && pw.p.ph.MaxTimestamp < retentionDeadline {
atomic.AddUint64(&pt.bigRowsDeleted, pw.p.ph.RowsCount)
m[pw] = true
pw.isInMerge = true
pws = append(pws, pw)
}
}
removedInmemoryParts := 0
removedSmallParts := 0
removedBigParts := 0
if len(m) > 0 {
pt.inmemoryParts, removedInmemoryParts = removeParts(pt.inmemoryParts, m)
pt.smallParts, removedSmallParts = removeParts(pt.smallParts, m)
pt.bigParts, removedBigParts = removeParts(pt.bigParts, m)
}
pt.partsLock.Unlock()
removedParts := removedInmemoryParts + removedSmallParts + removedBigParts
if removedParts != len(m) {
logger.Panicf("BUG: unexpected number of stale parts removed; got %d, want %d", removedParts, len(m))
}
// Physically remove stale parts under snapshotLock in order to provide
// consistent snapshots with table.CreateSnapshot().
pt.snapshotLock.RLock()
for pw := range m {
if pw.mp == nil {
logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, pt.s.retentionMsecs/1000)
fs.MustRemoveDirAtomic(pw.p.path)
}
}
// There is no need in calling fs.MustSyncPath() on pt.smallPartsPath and pt.bigPartsPath,
// since they should be automatically called inside fs.MustRemoveDirAtomic().
pt.snapshotLock.RUnlock()
// Remove partition references from removed parts.
for pw := range m {
pw.decRef()
}
pt.swapSrcWithDstParts(pws, nil, partSmall)
}
// getPartsToMerge returns optimal parts to merge from pws.
@ -1868,63 +1816,50 @@ func getPartsSize(pws []*partWrapper) uint64 {
return n
}
func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) {
func openParts(path string, partNames []string) ([]*partWrapper, error) {
// The path can be missing after restoring from backup, so create it if needed.
if err := fs.MkdirAllIfNotExist(path); err != nil {
return nil, err
}
fs.MustRemoveTemporaryDirs(path)
// Run remaining transactions and cleanup /txn and /tmp directories.
// Snapshots cannot be created yet, so use fakeSnapshotLock.
var fakeSnapshotLock sync.RWMutex
if err := runTransactions(&fakeSnapshotLock, pathPrefix1, pathPrefix2, path); err != nil {
return nil, fmt.Errorf("cannot run transactions from %q: %w", path, err)
}
// Remove txn and tmp directories, which may be left after the upgrade
// to v1.90.0 and newer versions.
fs.MustRemoveAll(path + "/txn")
fs.MustRemoveAll(path + "/tmp")
txnDir := path + "/txn"
fs.MustRemoveDirAtomic(txnDir)
tmpDir := path + "/tmp"
fs.MustRemoveDirAtomic(tmpDir)
if err := createPartitionDirs(path); err != nil {
return nil, fmt.Errorf("cannot create directories for partition %q: %w", path, err)
}
// Open parts.
// Remove dirs missing in partNames. These dirs may be left after unclean shutdown
// or after the update from versions prior to v1.90.0.
des, err := os.ReadDir(path)
if err != nil {
return nil, fmt.Errorf("cannot read partition directory: %w", err)
return nil, fmt.Errorf("cannot read partition dir: %w", err)
}
m := make(map[string]struct{}, len(partNames))
for _, partName := range partNames {
m[partName] = struct{}{}
}
var pws []*partWrapper
for _, de := range des {
if !fs.IsDirOrSymlink(de) {
// Skip non-directories.
continue
}
fn := de.Name()
if fn == "snapshots" {
// "snapshots" dir is skipped for backwards compatibility. Now it is unused.
continue
if _, ok := m[fn]; !ok {
deletePath := path + "/" + fn
fs.MustRemoveAll(deletePath)
}
if fn == "tmp" || fn == "txn" {
// Skip special dirs.
continue
}
partPath := path + "/" + fn
if fs.IsEmptyDir(partPath) {
// Remove empty directory, which can be left after unclean shutdown on NFS.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142
fs.MustRemoveDirAtomic(partPath)
continue
}
startTime := time.Now()
fs.MustSyncPath(path)
// Open parts
var pws []*partWrapper
for _, partName := range partNames {
partPath := path + "/" + partName
p, err := openFilePart(partPath)
if err != nil {
mustCloseParts(pws)
return nil, fmt.Errorf("cannot open part %q: %w", partPath, err)
}
logger.Infof("opened part %q in %.3f seconds", partPath, time.Since(startTime).Seconds())
pw := &partWrapper{
p: p,
refCount: 1,
@ -1946,8 +1881,7 @@ func mustCloseParts(pws []*partWrapper) {
// CreateSnapshotAt creates pt snapshot at the given smallPath and bigPath dirs.
//
// Snapshot is created using linux hard links, so it is usually created
// very quickly.
// Snapshot is created using linux hard links, so it is usually created very quickly.
func (pt *partition) CreateSnapshotAt(smallPath, bigPath string) error {
logger.Infof("creating partition snapshot of %q and %q...", pt.smallPartsPath, pt.bigPartsPath)
startTime := time.Now()
@ -1955,15 +1889,32 @@ func (pt *partition) CreateSnapshotAt(smallPath, bigPath string) error {
// Flush inmemory data to disk.
pt.flushInmemoryRows()
// The snapshot must be created under the lock in order to prevent from
// concurrent modifications via runTransaction.
pt.snapshotLock.Lock()
defer pt.snapshotLock.Unlock()
pt.partsLock.Lock()
incRefForParts(pt.smallParts)
pwsSmall := append([]*partWrapper{}, pt.smallParts...)
incRefForParts(pt.bigParts)
pwsBig := append([]*partWrapper{}, pt.bigParts...)
pt.partsLock.Unlock()
if err := pt.createSnapshot(pt.smallPartsPath, smallPath); err != nil {
defer func() {
pt.PutParts(pwsSmall)
pt.PutParts(pwsBig)
}()
if err := fs.MkdirAllFailIfExist(smallPath); err != nil {
return fmt.Errorf("cannot create snapshot dir %q: %w", smallPath, err)
}
if err := fs.MkdirAllFailIfExist(bigPath); err != nil {
return fmt.Errorf("cannot create snapshot dir %q: %w", bigPath, err)
}
// Create a file with part names at smallPath
mustWritePartNames(pwsSmall, pwsBig, smallPath)
if err := pt.createSnapshot(pt.smallPartsPath, smallPath, pwsSmall); err != nil {
return fmt.Errorf("cannot create snapshot for %q: %w", pt.smallPartsPath, err)
}
if err := pt.createSnapshot(pt.bigPartsPath, bigPath); err != nil {
if err := pt.createSnapshot(pt.bigPartsPath, bigPath, pwsBig); err != nil {
return fmt.Errorf("cannot create snapshot for %q: %w", pt.bigPartsPath, err)
}
@ -1975,202 +1926,115 @@ func (pt *partition) CreateSnapshotAt(smallPath, bigPath string) error {
// createSnapshot creates a snapshot from srcDir to dstDir.
//
// The caller is responsible for deleting dstDir if createSnapshot() returns error.
func (pt *partition) createSnapshot(srcDir, dstDir string) error {
if err := fs.MkdirAllFailIfExist(dstDir); err != nil {
return fmt.Errorf("cannot create snapshot dir %q: %w", dstDir, err)
}
des, err := os.ReadDir(srcDir)
if err != nil {
return fmt.Errorf("cannot read partition directory: %w", err)
}
for _, de := range des {
fn := de.Name()
if !fs.IsDirOrSymlink(de) {
if fn == "appliedRetention.txt" {
// Copy the appliedRetention.txt file to dstDir.
// This file can be created by VictoriaMetrics enterprise.
// See https://docs.victoriametrics.com/#retention-filters .
// Do not make hard link to this file, since it can be modified over time.
srcPath := srcDir + "/" + fn
dstPath := dstDir + "/" + fn
if err := fs.CopyFile(srcPath, dstPath); err != nil {
return fmt.Errorf("cannot copy %q to %q: %w", srcPath, dstPath, err)
}
}
// Skip non-directories.
continue
}
if fn == "tmp" || fn == "txn" || fs.IsScheduledForRemoval(fn) {
// Skip special dirs.
continue
}
srcPartPath := srcDir + "/" + fn
dstPartPath := dstDir + "/" + fn
func (pt *partition) createSnapshot(srcDir, dstDir string, pws []*partWrapper) error {
// Make hardlinks for pws at dstDir
for _, pw := range pws {
srcPartPath := pw.p.path
dstPartPath := dstDir + "/" + filepath.Base(srcPartPath)
if err := fs.HardLinkFiles(srcPartPath, dstPartPath); err != nil {
return fmt.Errorf("cannot create hard links from %q to %q: %w", srcPartPath, dstPartPath, err)
}
}
// Copy the appliedRetention.txt file to dstDir.
// This file can be created by VictoriaMetrics enterprise.
// See https://docs.victoriametrics.com/#retention-filters .
// Do not make hard link to this file, since it can be modified over time.
srcPath := srcDir + "/appliedRetention.txt"
if fs.IsPathExist(srcPath) {
dstPath := dstDir + "/" + filepath.Base(srcPath)
if err := fs.CopyFile(srcPath, dstPath); err != nil {
return fmt.Errorf("cannot copy %q to %q: %w", srcPath, dstPath, err)
}
}
fs.MustSyncPath(dstDir)
fs.MustSyncPath(filepath.Dir(dstDir))
parentDir := filepath.Dir(dstDir)
fs.MustSyncPath(parentDir)
return nil
}
func runTransactions(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, path string) error {
// Wait until all the previous pending transaction deletions are finished.
pendingTxnDeletionsWG.Wait()
type partNamesJSON struct {
Small []string
Big []string
}
// Make sure all the current transaction deletions are finished before exiting.
defer pendingTxnDeletionsWG.Wait()
func mustWritePartNames(pwsSmall, pwsBig []*partWrapper, dstDir string) {
partNamesSmall := getPartNames(pwsSmall)
partNamesBig := getPartNames(pwsBig)
partNames := &partNamesJSON{
Small: partNamesSmall,
Big: partNamesBig,
}
data, err := json.Marshal(partNames)
if err != nil {
logger.Panicf("BUG: cannot marshal partNames to JSON: %s", err)
}
partNamesPath := dstDir + "/parts.json"
if err := fs.WriteFileAtomically(partNamesPath, data, true); err != nil {
logger.Panicf("FATAL: cannot update %s: %s", partNamesPath, err)
}
}
txnDir := path + "/txn"
des, err := os.ReadDir(txnDir)
func getPartNames(pws []*partWrapper) []string {
partNames := make([]string, 0, len(pws))
for _, pw := range pws {
if pw.mp != nil {
// Skip in-memory parts
continue
}
partName := filepath.Base(pw.p.path)
partNames = append(partNames, partName)
}
sort.Strings(partNames)
return partNames
}
func mustReadPartNames(smallPartsPath, bigPartsPath string) ([]string, []string) {
partNamesPath := smallPartsPath + "/parts.json"
data, err := os.ReadFile(partNamesPath)
if err == nil {
var partNames partNamesJSON
if err := json.Unmarshal(data, &partNames); err != nil {
logger.Panicf("FATAL: cannot parse %s: %s", partNamesPath, err)
}
return partNames.Small, partNames.Big
}
if !os.IsNotExist(err) {
logger.Panicf("FATAL: cannot read parts.json file: %s", err)
}
// The parts.json is missing. This is the upgrade from versions previous to v1.90.0.
// Read part names from smallPartsPath and bigPartsPath directories
partNamesSmall := mustReadPartNamesFromDir(smallPartsPath)
partNamesBig := mustReadPartNamesFromDir(bigPartsPath)
return partNamesSmall, partNamesBig
}
func mustReadPartNamesFromDir(srcDir string) []string {
des, err := os.ReadDir(srcDir)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("cannot read transaction directory: %w", err)
logger.Panicf("FATAL: cannot read partition dir: %s", err)
}
// Sort transaction files by id.
sort.Slice(des, func(i, j int) bool {
return des[i].Name() < des[j].Name()
})
var partNames []string
for _, de := range des {
fn := de.Name()
if fs.IsTemporaryFileName(fn) {
// Skip temporary files, which could be left after unclean shutdown.
if !fs.IsDirOrSymlink(de) {
// Skip non-directories.
continue
}
txnPath := txnDir + "/" + fn
if err := runTransaction(txnLock, pathPrefix1, pathPrefix2, txnPath); err != nil {
return fmt.Errorf("cannot run transaction from %q: %w", txnPath, err)
partName := de.Name()
if isSpecialDir(partName) {
// Skip special dirs.
continue
}
partNames = append(partNames, partName)
}
return nil
return partNames
}
func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath string) error {
// The transaction must run under read lock in order to provide
// consistent snapshots with partition.CreateSnapshot().
txnLock.RLock()
defer txnLock.RUnlock()
data, err := os.ReadFile(txnPath)
if err != nil {
return fmt.Errorf("cannot read transaction file: %w", err)
}
if len(data) > 0 && data[len(data)-1] == '\n' {
data = data[:len(data)-1]
}
paths := strings.Split(string(data), "\n")
if len(paths) == 0 {
return fmt.Errorf("empty transaction")
}
rmPaths := paths[:len(paths)-1]
mvPaths := strings.Split(paths[len(paths)-1], " -> ")
if len(mvPaths) != 2 {
return fmt.Errorf("invalid last line in the transaction file: got %q; must contain `srcPath -> dstPath`", paths[len(paths)-1])
}
// Remove old paths. It is OK if certain paths don't exist.
for _, path := range rmPaths {
path, err := validatePath(pathPrefix1, pathPrefix2, path)
if err != nil {
return fmt.Errorf("invalid path to remove: %w", err)
}
fs.MustRemoveDirAtomic(path)
}
// Move the new part to new directory.
srcPath := mvPaths[0]
dstPath := mvPaths[1]
if len(srcPath) > 0 {
srcPath, err = validatePath(pathPrefix1, pathPrefix2, srcPath)
if err != nil {
return fmt.Errorf("invalid source path to rename: %w", err)
}
if len(dstPath) > 0 {
// Move srcPath to dstPath.
dstPath, err = validatePath(pathPrefix1, pathPrefix2, dstPath)
if err != nil {
return fmt.Errorf("invalid destination path to rename: %w", err)
}
if fs.IsPathExist(srcPath) {
if err := os.Rename(srcPath, dstPath); err != nil {
return fmt.Errorf("cannot rename %q to %q: %w", srcPath, dstPath, err)
}
} else if !fs.IsPathExist(dstPath) {
// Emit info message for the expected condition after unclean shutdown on NFS disk.
// The dstPath part may be missing because it could be already merged into bigger part
// while old source parts for the current txn weren't still deleted due to NFS locks.
logger.Infof("cannot find both source and destination paths: %q -> %q; this may be the case after unclean shutdown "+
"(OOM, `kill -9`, hard reset) on NFS disk", srcPath, dstPath)
}
} else {
// Just remove srcPath.
fs.MustRemoveDirAtomic(srcPath)
}
}
// Flush pathPrefix* directory metadata to the underlying storage,
// so the moved files become visible there.
fs.MustSyncPath(pathPrefix1)
fs.MustSyncPath(pathPrefix2)
pendingTxnDeletionsWG.Add(1)
go func() {
defer pendingTxnDeletionsWG.Done()
// There is no need in calling fs.MustSyncPath for pathPrefix* after parts' removal,
// since it is already called by fs.MustRemoveDirAtomic.
if err := os.Remove(txnPath); err != nil {
logger.Errorf("cannot remove transaction file %q: %s", txnPath, err)
}
}()
return nil
}
var pendingTxnDeletionsWG syncwg.WaitGroup
func validatePath(pathPrefix1, pathPrefix2, path string) (string, error) {
var err error
pathPrefix1, err = filepath.Abs(pathPrefix1)
if err != nil {
return path, fmt.Errorf("cannot determine absolute path for pathPrefix1=%q: %w", pathPrefix1, err)
}
pathPrefix2, err = filepath.Abs(pathPrefix2)
if err != nil {
return path, fmt.Errorf("cannot determine absolute path for pathPrefix2=%q: %w", pathPrefix2, err)
}
path, err = filepath.Abs(path)
if err != nil {
return path, fmt.Errorf("cannot determine absolute path for %q: %w", path, err)
}
if !strings.HasPrefix(path, pathPrefix1+"/") && !strings.HasPrefix(path, pathPrefix2+"/") {
return path, fmt.Errorf("invalid path %q; must start with either %q or %q", path, pathPrefix1+"/", pathPrefix2+"/")
}
return path, nil
}
func createPartitionDirs(path string) error {
path = filepath.Clean(path)
txnPath := path + "/txn"
if err := fs.MkdirAllFailIfExist(txnPath); err != nil {
return fmt.Errorf("cannot create txn directory %q: %w", txnPath, err)
}
tmpPath := path + "/tmp"
if err := fs.MkdirAllFailIfExist(tmpPath); err != nil {
return fmt.Errorf("cannot create tmp directory %q: %w", tmpPath, err)
}
fs.MustSyncPath(path)
return nil
func isSpecialDir(name string) bool {
return name == "tmp" || name == "txn" || name == "snapshots" || fs.IsScheduledForRemoval(name)
}

View file

@ -160,8 +160,8 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1447 for details.
if fs.IsPathExist(s.cachePath + "/reset_cache_on_startup") {
logger.Infof("removing cache directory at %q, since it contains `reset_cache_on_startup` file...", s.cachePath)
// Do not use fs.MustRemoveDirAtomic() here, since the cache directory may be mounted
// to a separate filesystem. In this case the fs.MustRemoveDirAtomic() will fail while
// Do not use fs.MustRemoveAll() here, since the cache directory may be mounted
// to a separate filesystem. In this case the fs.MustRemoveAll() will fail while
// trying to remove the mount root.
fs.RemoveDirContents(s.cachePath)
logger.Infof("cache directory at %q has been successfully removed", s.cachePath)
@ -2377,7 +2377,7 @@ func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error
for _, tn := range tableNames[:len(tableNames)-2] {
pathToRemove := path + "/" + tn
logger.Infof("removing obsolete indexdb dir %q...", pathToRemove)
fs.MustRemoveDirAtomic(pathToRemove)
fs.MustRemoveAll(pathToRemove)
logger.Infof("removed obsolete indexdb dir %q", pathToRemove)
}