diff --git a/Makefile b/Makefile index 31875c24f..3cbb1bb7a 100644 --- a/Makefile +++ b/Makefile @@ -186,7 +186,8 @@ release-victoria-metrics: \ release-victoria-metrics-darwin-amd64 \ release-victoria-metrics-darwin-arm64 \ release-victoria-metrics-freebsd-amd64 \ - release-victoria-metrics-openbsd-amd64 + release-victoria-metrics-openbsd-amd64 \ + release-victoria-metrics-windows-amd64 # adds i386 arch release-victoria-metrics-linux-386: @@ -213,6 +214,9 @@ release-victoria-metrics-freebsd-amd64: release-victoria-metrics-openbsd-amd64: GOOS=openbsd GOARCH=amd64 $(MAKE) release-victoria-metrics-goos-goarch +release-victoria-metrics-windows-amd64: + GOARCH=amd64 $(MAKE) release-victoria-metrics-windows-goarch + release-victoria-metrics-goos-goarch: victoria-metrics-$(GOOS)-$(GOARCH)-prod cd bin && \ tar --transform="flags=r;s|-$(GOOS)-$(GOARCH)||" -czf victoria-metrics-$(GOOS)-$(GOARCH)-$(PKG_TAG).tar.gz \ @@ -222,6 +226,16 @@ release-victoria-metrics-goos-goarch: victoria-metrics-$(GOOS)-$(GOARCH)-prod | sed s/-$(GOOS)-$(GOARCH)-prod/-prod/ > victoria-metrics-$(GOOS)-$(GOARCH)-$(PKG_TAG)_checksums.txt cd bin && rm -rf victoria-metrics-$(GOOS)-$(GOARCH)-prod +release-victoria-metrics-windows-goarch: victoria-metrics-windows-$(GOARCH)-prod + cd bin && \ + zip victoria-metrics-windows-$(GOARCH)-$(PKG_TAG).zip \ + victoria-metrics-windows-$(GOARCH)-prod.exe \ + && sha256sum victoria-metrics-windows-$(GOARCH)-$(PKG_TAG).zip \ + victoria-metrics-windows-$(GOARCH)-prod.exe \ + > victoria-metrics-windows-$(GOARCH)-$(PKG_TAG)_checksums.txt + cd bin && rm -rf \ + victoria-metrics-windows-$(GOARCH)-prod.exe + release-vmutils: \ release-vmutils-linux-386 \ release-vmutils-linux-amd64 \ @@ -314,7 +328,6 @@ release-vmutils-windows-goarch: \ vmauth-windows-$(GOARCH)-prod.exe \ vmctl-windows-$(GOARCH)-prod.exe - pprof-cpu: go tool pprof -trim_path=github.com/VictoriaMetrics/VictoriaMetrics@ $(PPROF_FILE) diff --git a/README.md b/README.md index 842ae1b68..931aecfb9 100644 --- a/README.md +++ b/README.md @@ -1447,12 +1447,14 @@ can be configured with the `-inmemoryDataFlushInterval` command-line flag (note In-memory parts are persisted to disk into `part` directories under the `<-storageDataPath>/data/small/YYYY_MM/` folder, where `YYYY_MM` is the month partition for the stored data. For example, `2022_11` is the partition for `parts` with [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) from `November 2022`. +Each partition directory contains `parts.json` file with the actual list of parts in the partition. -The `part` directory has the following name pattern: `rowsCount_blocksCount_minTimestamp_maxTimestamp`, where: +Every `part` directory contains `metadata.json` file with the following fields: -- `rowsCount` - the number of [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) stored in the part -- `blocksCount` - the number of blocks stored in the part (see details about blocks below) -- `minTimestamp` and `maxTimestamp` - minimum and maximum timestamps across raw samples stored in the part +- `RowsCount` - the number of [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) stored in the part +- `BlocksCount` - the number of blocks stored in the part (see details about blocks below) +- `MinTimestamp` and `MaxTimestamp` - minimum and maximum timestamps across raw samples stored in the part +- `MinDedupInterval` - the [deduplication interval](#deduplication) applied to the given part. Each `part` consists of `blocks` sorted by internal time series id (aka `TSID`). Each `block` contains up to 8K [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples), @@ -1474,9 +1476,8 @@ for fast block lookups, which belong to the given `TSID` and cover the given tim and [freeing up disk space for the deleted time series](#how-to-delete-time-series) are performed during the merge Newly added `parts` either successfully appear in the storage or fail to appear. -The newly added `parts` are being created in a temporary directory under `<-storageDataPath>/data/{small,big}/YYYY_MM/tmp` folder. -When the newly added `part` is fully written and [fsynced](https://man7.org/linux/man-pages/man2/fsync.2.html) -to a temporary directory, then it is atomically moved to the storage directory. +The newly added `part` is atomically registered in the `parts.json` file under the corresponding partition +after it is fully written and [fsynced](https://man7.org/linux/man-pages/man2/fsync.2.html) to the storage. Thanks to this alogrithm, storage never contains partially created parts, even if hardware power off occurrs in the middle of writing the `part` to disk - such incompletely written `parts` are automatically deleted on the next VictoriaMetrics start. @@ -1505,8 +1506,7 @@ Retention is configured with the `-retentionPeriod` command-line flag, which tak Data is split in per-month partitions inside `<-storageDataPath>/data/{small,big}` folders. Data partitions outside the configured retention are deleted on the first day of the new month. -Each partition consists of one or more data parts with the following name pattern `rowsCount_blocksCount_minTimestamp_maxTimestamp`. -Data parts outside of the configured retention are eventually deleted during +Each partition consists of one or more data parts. Data parts outside of the configured retention are eventually deleted during [background merge](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). The maximum disk space usage for a given `-retentionPeriod` is going to be (`-retentionPeriod` + 1) months. diff --git a/app/victoria-metrics/Makefile b/app/victoria-metrics/Makefile index baf91090c..f8fa3d089 100644 --- a/app/victoria-metrics/Makefile +++ b/app/victoria-metrics/Makefile @@ -39,6 +39,9 @@ victoria-metrics-freebsd-amd64-prod: victoria-metrics-openbsd-amd64-prod: APP_NAME=victoria-metrics $(MAKE) app-via-docker-openbsd-amd64 +victoria-metrics-windows-amd64-prod: + APP_NAME=victoria-metrics $(MAKE) app-via-docker-windows-amd64 + package-victoria-metrics: APP_NAME=victoria-metrics $(MAKE) package-via-docker @@ -100,6 +103,9 @@ victoria-metrics-freebsd-amd64: victoria-metrics-openbsd-amd64: APP_NAME=victoria-metrics CGO_ENABLED=0 GOOS=openbsd GOARCH=amd64 $(MAKE) app-local-goos-goarch +victoria-metrics-windows-amd64: + GOARCH=amd64 APP_NAME=victoria-metrics $(MAKE) app-local-windows-goarch + victoria-metrics-pure: APP_NAME=victoria-metrics $(MAKE) app-local-pure diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 387fd4847..f8f8ada41 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,11 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +**Update note: this release contains backwards-incompatible change in storage data format, +so the previous versions of VictoriaMetrics will exit with the `unexpected number of substrings in the part name` error when trying to run them on the data +created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or newer releases** + +* FEATURE: publish VictoriaMetrics binaries for Windows. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3236), [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3821) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70) issues. * FEATURE: log metrics with truncated labels if the length of label value in the ingested metric exceeds `-maxLabelValueLen`. This should simplify debugging for this case. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [VictoriaMetrics remote write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol) when [sending / receiving data to / from Kafka](https://docs.victoriametrics.com/vmagent.html#kafka-integration). This protocol allows saving egress network bandwidth costs when sending data from `vmagent` to `Kafka` located in another datacenter or availability zone. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1225). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `--kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957). diff --git a/docs/README.md b/docs/README.md index 455094ef0..7817851bd 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1448,12 +1448,14 @@ can be configured with the `-inmemoryDataFlushInterval` command-line flag (note In-memory parts are persisted to disk into `part` directories under the `<-storageDataPath>/data/small/YYYY_MM/` folder, where `YYYY_MM` is the month partition for the stored data. For example, `2022_11` is the partition for `parts` with [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) from `November 2022`. +Each partition directory contains `parts.json` file with the actual list of parts in the partition. -The `part` directory has the following name pattern: `rowsCount_blocksCount_minTimestamp_maxTimestamp`, where: +Every `part` directory contains `metadata.json` file with the following fields: -- `rowsCount` - the number of [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) stored in the part -- `blocksCount` - the number of blocks stored in the part (see details about blocks below) -- `minTimestamp` and `maxTimestamp` - minimum and maximum timestamps across raw samples stored in the part +- `RowsCount` - the number of [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) stored in the part +- `BlocksCount` - the number of blocks stored in the part (see details about blocks below) +- `MinTimestamp` and `MaxTimestamp` - minimum and maximum timestamps across raw samples stored in the part +- `MinDedupInterval` - the [deduplication interval](#deduplication) applied to the given part. Each `part` consists of `blocks` sorted by internal time series id (aka `TSID`). Each `block` contains up to 8K [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples), @@ -1475,9 +1477,8 @@ for fast block lookups, which belong to the given `TSID` and cover the given tim and [freeing up disk space for the deleted time series](#how-to-delete-time-series) are performed during the merge Newly added `parts` either successfully appear in the storage or fail to appear. -The newly added `parts` are being created in a temporary directory under `<-storageDataPath>/data/{small,big}/YYYY_MM/tmp` folder. -When the newly added `part` is fully written and [fsynced](https://man7.org/linux/man-pages/man2/fsync.2.html) -to a temporary directory, then it is atomically moved to the storage directory. +The newly added `part` is atomically registered in the `parts.json` file under the corresponding partition +after it is fully written and [fsynced](https://man7.org/linux/man-pages/man2/fsync.2.html) to the storage. Thanks to this alogrithm, storage never contains partially created parts, even if hardware power off occurrs in the middle of writing the `part` to disk - such incompletely written `parts` are automatically deleted on the next VictoriaMetrics start. @@ -1506,8 +1507,7 @@ Retention is configured with the `-retentionPeriod` command-line flag, which tak Data is split in per-month partitions inside `<-storageDataPath>/data/{small,big}` folders. Data partitions outside the configured retention are deleted on the first day of the new month. -Each partition consists of one or more data parts with the following name pattern `rowsCount_blocksCount_minTimestamp_maxTimestamp`. -Data parts outside of the configured retention are eventually deleted during +Each partition consists of one or more data parts. Data parts outside of the configured retention are eventually deleted during [background merge](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). The maximum disk space usage for a given `-retentionPeriod` is going to be (`-retentionPeriod` + 1) months. diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 814ead981..625d67898 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -1451,12 +1451,14 @@ can be configured with the `-inmemoryDataFlushInterval` command-line flag (note In-memory parts are persisted to disk into `part` directories under the `<-storageDataPath>/data/small/YYYY_MM/` folder, where `YYYY_MM` is the month partition for the stored data. For example, `2022_11` is the partition for `parts` with [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) from `November 2022`. +Each partition directory contains `parts.json` file with the actual list of parts in the partition. -The `part` directory has the following name pattern: `rowsCount_blocksCount_minTimestamp_maxTimestamp`, where: +Every `part` directory contains `metadata.json` file with the following fields: -- `rowsCount` - the number of [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) stored in the part -- `blocksCount` - the number of blocks stored in the part (see details about blocks below) -- `minTimestamp` and `maxTimestamp` - minimum and maximum timestamps across raw samples stored in the part +- `RowsCount` - the number of [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) stored in the part +- `BlocksCount` - the number of blocks stored in the part (see details about blocks below) +- `MinTimestamp` and `MaxTimestamp` - minimum and maximum timestamps across raw samples stored in the part +- `MinDedupInterval` - the [deduplication interval](#deduplication) applied to the given part. Each `part` consists of `blocks` sorted by internal time series id (aka `TSID`). Each `block` contains up to 8K [raw samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples), @@ -1478,9 +1480,8 @@ for fast block lookups, which belong to the given `TSID` and cover the given tim and [freeing up disk space for the deleted time series](#how-to-delete-time-series) are performed during the merge Newly added `parts` either successfully appear in the storage or fail to appear. -The newly added `parts` are being created in a temporary directory under `<-storageDataPath>/data/{small,big}/YYYY_MM/tmp` folder. -When the newly added `part` is fully written and [fsynced](https://man7.org/linux/man-pages/man2/fsync.2.html) -to a temporary directory, then it is atomically moved to the storage directory. +The newly added `part` is atomically registered in the `parts.json` file under the corresponding partition +after it is fully written and [fsynced](https://man7.org/linux/man-pages/man2/fsync.2.html) to the storage. Thanks to this alogrithm, storage never contains partially created parts, even if hardware power off occurrs in the middle of writing the `part` to disk - such incompletely written `parts` are automatically deleted on the next VictoriaMetrics start. @@ -1509,8 +1510,7 @@ Retention is configured with the `-retentionPeriod` command-line flag, which tak Data is split in per-month partitions inside `<-storageDataPath>/data/{small,big}` folders. Data partitions outside the configured retention are deleted on the first day of the new month. -Each partition consists of one or more data parts with the following name pattern `rowsCount_blocksCount_minTimestamp_maxTimestamp`. -Data parts outside of the configured retention are eventually deleted during +Each partition consists of one or more data parts. Data parts outside of the configured retention are eventually deleted during [background merge](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). The maximum disk space usage for a given `-retentionPeriod` is going to be (`-retentionPeriod` + 1) months. diff --git a/lib/mergeset/block_stream_reader.go b/lib/mergeset/block_stream_reader.go index 528a89017..08d4733e0 100644 --- a/lib/mergeset/block_stream_reader.go +++ b/lib/mergeset/block_stream_reader.go @@ -143,8 +143,8 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error { path = filepath.Clean(path) - if err := bsr.ph.ParseFromPath(path); err != nil { - return fmt.Errorf("cannot parse partHeader data from %q: %w", path, err) + if err := bsr.ph.ReadMetadata(path); err != nil { + return fmt.Errorf("cannot read metadata from %q: %w", path, err) } metaindexPath := path + "/metaindex.bin" diff --git a/lib/mergeset/part.go b/lib/mergeset/part.go index f8f45f1b8..090b3f116 100644 --- a/lib/mergeset/part.go +++ b/lib/mergeset/part.go @@ -2,7 +2,6 @@ package mergeset import ( "fmt" - "path/filepath" "sync" "unsafe" @@ -68,11 +67,9 @@ type part struct { } func openFilePart(path string) (*part, error) { - path = filepath.Clean(path) - var ph partHeader - if err := ph.ParseFromPath(path); err != nil { - return nil, fmt.Errorf("cannot parse path to part: %w", err) + if err := ph.ReadMetadata(path); err != nil { + return nil, fmt.Errorf("cannot read part metadata: %w", err) } metaindexPath := path + "/metaindex.bin" diff --git a/lib/mergeset/part_header.go b/lib/mergeset/part_header.go index 43a5ea43b..4489d947c 100644 --- a/lib/mergeset/part_header.go +++ b/lib/mergeset/part_header.go @@ -5,11 +5,9 @@ import ( "encoding/json" "fmt" "os" - "path/filepath" - "strconv" - "strings" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) type partHeader struct { @@ -79,50 +77,10 @@ func (ph *partHeader) CopyFrom(src *partHeader) { ph.lastItem = append(ph.lastItem[:0], src.lastItem...) } -func (ph *partHeader) ParseFromPath(partPath string) error { +func (ph *partHeader) ReadMetadata(partPath string) error { ph.Reset() - partPath = filepath.Clean(partPath) - - // Extract encoded part name. - n := strings.LastIndexByte(partPath, '/') - if n < 0 { - return fmt.Errorf("cannot find encoded part name in the path %q", partPath) - } - partName := partPath[n+1:] - - // PartName must have the following form: - // itemsCount_blocksCount_Garbage - a := strings.Split(partName, "_") - if len(a) != 3 { - return fmt.Errorf("unexpected number of substrings in the part name %q: got %d; want %d", partName, len(a), 3) - } - - // Read itemsCount from partName. - itemsCount, err := strconv.ParseUint(a[0], 10, 64) - if err != nil { - return fmt.Errorf("cannot parse itemsCount from partName %q: %w", partName, err) - } - ph.itemsCount = itemsCount - if ph.itemsCount <= 0 { - return fmt.Errorf("part %q cannot contain zero items", partPath) - } - - // Read blocksCount from partName. - blocksCount, err := strconv.ParseUint(a[1], 10, 64) - if err != nil { - return fmt.Errorf("cannot parse blocksCount from partName %q: %w", partName, err) - } - ph.blocksCount = blocksCount - if ph.blocksCount <= 0 { - return fmt.Errorf("part %q cannot contain zero blocks", partPath) - } - if ph.blocksCount > ph.itemsCount { - return fmt.Errorf("the number of blocks cannot exceed the number of items in the part %q; got blocksCount=%d, itemsCount=%d", - partPath, ph.blocksCount, ph.itemsCount) - } - - // Read other ph fields from metadata. + // Read ph fields from metadata. metadataPath := partPath + "/metadata.json" metadata, err := os.ReadFile(metadataPath) if err != nil { @@ -133,12 +91,20 @@ func (ph *partHeader) ParseFromPath(partPath string) error { if err := json.Unmarshal(metadata, &phj); err != nil { return fmt.Errorf("cannot parse %q: %w", metadataPath, err) } - if ph.itemsCount != phj.ItemsCount { - return fmt.Errorf("invalid ItemsCount in %q; got %d; want %d", metadataPath, phj.ItemsCount, ph.itemsCount) + + if phj.ItemsCount <= 0 { + return fmt.Errorf("part %q cannot contain zero items", partPath) } - if ph.blocksCount != phj.BlocksCount { - return fmt.Errorf("invalid BlocksCount in %q; got %d; want %d", metadataPath, phj.BlocksCount, ph.blocksCount) + ph.itemsCount = phj.ItemsCount + + if phj.BlocksCount <= 0 { + return fmt.Errorf("part %q cannot contain zero blocks", partPath) } + if phj.BlocksCount > phj.ItemsCount { + return fmt.Errorf("the number of blocks cannot exceed the number of items in the part %q; got blocksCount=%d, itemsCount=%d", + partPath, phj.BlocksCount, phj.ItemsCount) + } + ph.blocksCount = phj.BlocksCount ph.firstItem = append(ph.firstItem[:0], phj.FirstItem...) ph.lastItem = append(ph.lastItem[:0], phj.LastItem...) @@ -146,11 +112,6 @@ func (ph *partHeader) ParseFromPath(partPath string) error { return nil } -func (ph *partHeader) Path(tablePath string, suffix uint64) string { - tablePath = filepath.Clean(tablePath) - return fmt.Sprintf("%s/%d_%d_%016X", tablePath, ph.itemsCount, ph.blocksCount, suffix) -} - func (ph *partHeader) WriteMetadata(partPath string) error { phj := &partHeaderJSON{ ItemsCount: ph.itemsCount, @@ -158,9 +119,9 @@ func (ph *partHeader) WriteMetadata(partPath string) error { FirstItem: append([]byte{}, ph.firstItem...), LastItem: append([]byte{}, ph.lastItem...), } - metadata, err := json.MarshalIndent(&phj, "", "\t") + metadata, err := json.Marshal(&phj) if err != nil { - return fmt.Errorf("cannot marshal metadata: %w", err) + logger.Panicf("BUG: cannot marshal partHeader metadata: %s", err) } metadataPath := partPath + "/metadata.json" if err := fs.WriteFileAtomically(metadataPath, metadata, false); err != nil { diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 05396cdbc..f42b7ad7c 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -1,6 +1,7 @@ package mergeset import ( + "encoding/json" "errors" "fmt" "os" @@ -12,7 +13,6 @@ import ( "time" "unsafe" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" @@ -141,8 +141,6 @@ type Table struct { // which may need to be merged. needMergeCh chan struct{} - snapshotLock sync.RWMutex - flockF *os.File stopCh chan struct{} @@ -274,7 +272,9 @@ type partWrapper struct { mp *inmemoryPart - refCount uint64 + refCount uint32 + + mustBeDeleted uint32 isInMerge bool @@ -283,18 +283,22 @@ type partWrapper struct { } func (pw *partWrapper) incRef() { - atomic.AddUint64(&pw.refCount, 1) + atomic.AddUint32(&pw.refCount, 1) } func (pw *partWrapper) decRef() { - n := atomic.AddUint64(&pw.refCount, ^uint64(0)) - if int64(n) < 0 { - logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", int64(n)) + n := atomic.AddUint32(&pw.refCount, ^uint32(0)) + if int32(n) < 0 { + logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", int32(n)) } if n > 0 { return } + deletePath := "" + if pw.mp == nil && atomic.LoadUint32(&pw.mustBeDeleted) != 0 { + deletePath = pw.p.path + } if pw.mp != nil { // Do not return pw.mp to pool via putInmemoryPart(), // since pw.mp size may be too big compared to other entries stored in the pool. @@ -303,6 +307,10 @@ func (pw *partWrapper) decRef() { } pw.p.MustClose() pw.p = nil + + if deletePath != "" { + fs.MustRemoveAll(deletePath) + } } // OpenTable opens a table on the given path. @@ -512,7 +520,7 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) { m.InmemoryBlocksCount += p.ph.blocksCount m.InmemoryItemsCount += p.ph.itemsCount m.InmemorySizeBytes += p.size - m.PartsRefCount += atomic.LoadUint64(&pw.refCount) + m.PartsRefCount += uint64(atomic.LoadUint32(&pw.refCount)) } m.FilePartsCount += uint64(len(tb.fileParts)) @@ -521,7 +529,7 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) { m.FileBlocksCount += p.ph.blocksCount m.FileItemsCount += p.ph.itemsCount m.FileSizeBytes += p.size - m.PartsRefCount += atomic.LoadUint64(&pw.refCount) + m.PartsRefCount += uint64(atomic.LoadUint32(&pw.refCount)) } tb.partsLock.Unlock() @@ -1074,21 +1082,19 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal // Initialize destination paths. dstPartType := getDstPartType(pws, isFinal) - tmpPartPath, mergeIdx := tb.getDstPartPaths(dstPartType) + mergeIdx := tb.nextMergeIdx() + dstPartPath := "" + if dstPartType == partFile { + dstPartPath = fmt.Sprintf("%s/%016X", tb.path, mergeIdx) + } if isFinal && len(pws) == 1 && pws[0].mp != nil { // Fast path: flush a single in-memory part to disk. mp := pws[0].mp - if tmpPartPath == "" { - logger.Panicf("BUG: tmpPartPath must be non-empty") - } - if err := mp.StoreToDisk(tmpPartPath); err != nil { - return fmt.Errorf("cannot store in-memory part to %q: %w", tmpPartPath, err) - } - pwNew, err := tb.openCreatedPart(&mp.ph, pws, nil, tmpPartPath, mergeIdx) - if err != nil { - return fmt.Errorf("cannot atomically register the created part: %w", err) + if err := mp.StoreToDisk(dstPartPath); err != nil { + logger.Panicf("FATAL: cannot store in-memory part to %s: %s", dstPartPath, err) } + pwNew := tb.openCreatedPart(pws, nil, dstPartPath) tb.swapSrcWithDstParts(pws, pwNew, dstPartType) return nil } @@ -1096,7 +1102,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal // Prepare BlockStreamReaders for source parts. bsrs, err := openBlockStreamReaders(pws) if err != nil { - return err + logger.Panicf("FATAL: cannot open source parts for merging: %s", err) } closeBlockStreamReaders := func() { for _, bsr := range bsrs { @@ -1121,45 +1127,30 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal mpNew = &inmemoryPart{} bsw.InitFromInmemoryPart(mpNew, compressLevel) } else { - if tmpPartPath == "" { - logger.Panicf("BUG: tmpPartPath must be non-empty") - } nocache := srcItemsCount > maxItemsPerCachedPart() - if err := bsw.InitFromFilePart(tmpPartPath, nocache, compressLevel); err != nil { - closeBlockStreamReaders() - return fmt.Errorf("cannot create destination part at %q: %w", tmpPartPath, err) + if err := bsw.InitFromFilePart(dstPartPath, nocache, compressLevel); err != nil { + logger.Panicf("FATAL: cannot create destination part at %s: %s", dstPartPath, err) } } // Merge source parts to destination part. - ph, err := tb.mergePartsInternal(tmpPartPath, bsw, bsrs, dstPartType, stopCh) + ph, err := tb.mergePartsInternal(dstPartPath, bsw, bsrs, dstPartType, stopCh) putBlockStreamWriter(bsw) closeBlockStreamReaders() if err != nil { - return fmt.Errorf("cannot merge %d parts: %w", len(pws), err) + return err } if mpNew != nil { // Update partHeader for destination inmemory part after the merge. mpNew.ph = *ph } - // Atomically move the created part from tmpPartPath to its destination - // and swap the source parts with the newly created part. - pwNew, err := tb.openCreatedPart(ph, pws, mpNew, tmpPartPath, mergeIdx) - if err != nil { - return fmt.Errorf("cannot atomically register the created part: %w", err) - } - dstItemsCount := uint64(0) - dstBlocksCount := uint64(0) - dstSize := uint64(0) - dstPartPath := "" - if pwNew != nil { - pDst := pwNew.p - dstItemsCount = pDst.ph.itemsCount - dstBlocksCount = pDst.ph.blocksCount - dstSize = pDst.size - dstPartPath = pDst.path - } + // Atomically swap the source parts with the newly created part. + pwNew := tb.openCreatedPart(pws, mpNew, dstPartPath) + pDst := pwNew.p + dstItemsCount := pDst.ph.itemsCount + dstBlocksCount := pDst.ph.blocksCount + dstSize := pDst.size tb.swapSrcWithDstParts(pws, pwNew, dstPartType) @@ -1207,19 +1198,6 @@ func getDstPartType(pws []*partWrapper, isFinal bool) partType { return partInmemory } -func (tb *Table) getDstPartPaths(dstPartType partType) (string, uint64) { - tmpPartPath := "" - mergeIdx := tb.nextMergeIdx() - switch dstPartType { - case partInmemory: - case partFile: - tmpPartPath = fmt.Sprintf("%s/tmp/%016X", tb.path, mergeIdx) - default: - logger.Panicf("BUG: unknown partType=%d", dstPartType) - } - return tmpPartPath, mergeIdx -} - func openBlockStreamReaders(pws []*partWrapper) ([]*blockStreamReader, error) { bsrs := make([]*blockStreamReader, 0, len(pws)) for _, pw := range pws { @@ -1239,7 +1217,7 @@ func openBlockStreamReaders(pws []*partWrapper) ([]*blockStreamReader, error) { return bsrs, nil } -func (tb *Table) mergePartsInternal(tmpPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) { +func (tb *Table) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) { var ph partHeader var itemsMerged *uint64 var mergesCount *uint64 @@ -1261,56 +1239,34 @@ func (tb *Table) mergePartsInternal(tmpPartPath string, bsw *blockStreamWriter, atomic.AddUint64(activeMerges, ^uint64(0)) atomic.AddUint64(mergesCount, 1) if err != nil { - return nil, fmt.Errorf("cannot merge parts to %q: %w", tmpPartPath, err) + return nil, fmt.Errorf("cannot merge %d parts to %s: %w", len(bsrs), dstPartPath, err) } - if tmpPartPath != "" { - if err := ph.WriteMetadata(tmpPartPath); err != nil { - return nil, fmt.Errorf("cannot write metadata to destination part %q: %w", tmpPartPath, err) + if dstPartPath != "" { + if err := ph.WriteMetadata(dstPartPath); err != nil { + logger.Panicf("FATAL: cannot write metadata to %s: %s", dstPartPath, err) } } return &ph, nil } -func (tb *Table) openCreatedPart(ph *partHeader, pws []*partWrapper, mpNew *inmemoryPart, tmpPartPath string, mergeIdx uint64) (*partWrapper, error) { - dstPartPath := "" - if mpNew == nil || !areAllInmemoryParts(pws) { - // Either source or destination parts are located on disk. - // Create a transaction for atomic deleting of old parts and moving new part to its destination on disk. - var bb bytesutil.ByteBuffer - for _, pw := range pws { - if pw.mp == nil { - fmt.Fprintf(&bb, "%s\n", pw.p.path) - } - } - dstPartPath = ph.Path(tb.path, mergeIdx) - fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath) - txnPath := fmt.Sprintf("%s/txn/%016X", tb.path, mergeIdx) - if err := fs.WriteFileAtomically(txnPath, bb.B, false); err != nil { - return nil, fmt.Errorf("cannot create transaction file %q: %w", txnPath, err) - } - - // Run the created transaction. - if err := runTransaction(&tb.snapshotLock, tb.path, txnPath); err != nil { - return nil, fmt.Errorf("cannot execute transaction %q: %w", txnPath, err) - } - } +func (tb *Table) openCreatedPart(pws []*partWrapper, mpNew *inmemoryPart, dstPartPath string) *partWrapper { // Open the created part. if mpNew != nil { // Open the created part from memory. flushToDiskDeadline := getFlushToDiskDeadline(pws) pwNew := newPartWrapperFromInmemoryPart(mpNew, flushToDiskDeadline) - return pwNew, nil + return pwNew } // Open the created part from disk. pNew, err := openFilePart(dstPartPath) if err != nil { - return nil, fmt.Errorf("cannot open merged part %q: %w", dstPartPath, err) + logger.Panicf("FATAL: cannot open the merged part: %s", err) } pwNew := &partWrapper{ p: pNew, refCount: 1, } - return pwNew, nil + return pwNew } func areAllInmemoryParts(pws []*partWrapper) bool { @@ -1335,19 +1291,26 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst removedFileParts := 0 tb.partsLock.Lock() + tb.inmemoryParts, removedInmemoryParts = removeParts(tb.inmemoryParts, m) tb.fileParts, removedFileParts = removeParts(tb.fileParts, m) - if pwNew != nil { - switch dstPartType { - case partInmemory: - tb.inmemoryParts = append(tb.inmemoryParts, pwNew) - case partFile: - tb.fileParts = append(tb.fileParts, pwNew) - default: - logger.Panicf("BUG: unknown partType=%d", dstPartType) - } - tb.notifyBackgroundMergers() + switch dstPartType { + case partInmemory: + tb.inmemoryParts = append(tb.inmemoryParts, pwNew) + case partFile: + tb.fileParts = append(tb.fileParts, pwNew) + default: + logger.Panicf("BUG: unknown partType=%d", dstPartType) } + tb.notifyBackgroundMergers() + + // Atomically store the updated list of file-based parts on disk. + // This must be performed under partsLock in order to prevent from races + // when multiple concurrently running goroutines update the list. + if removedFileParts > 0 || dstPartType == partFile { + mustWritePartNames(tb.fileParts, tb.path) + } + tb.partsLock.Unlock() removedParts := removedInmemoryParts + removedFileParts @@ -1355,8 +1318,10 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst logger.Panicf("BUG: unexpected number of parts removed; got %d, want %d", removedParts, len(m)) } - // Remove references from old parts. + // Mark old parts as must be deleted and decrement reference count, + // so they are eventually closed and deleted. for _, pw := range pws { + atomic.StoreUint32(&pw.mustBeDeleted, 1) pw.decRef() } } @@ -1412,50 +1377,40 @@ func openParts(path string) ([]*partWrapper, error) { } fs.MustRemoveTemporaryDirs(path) - // Run remaining transactions and cleanup /txn and /tmp directories. - // Snapshots cannot be created yet, so use fakeSnapshotLock. - var fakeSnapshotLock sync.RWMutex - if err := runTransactions(&fakeSnapshotLock, path); err != nil { - return nil, fmt.Errorf("cannot run transactions: %w", err) - } + // Remove txn and tmp directories, which may be left after the upgrade + // to v1.90.0 and newer versions. + fs.MustRemoveAll(path + "/txn") + fs.MustRemoveAll(path + "/tmp") - txnDir := path + "/txn" - fs.MustRemoveDirAtomic(txnDir) - if err := fs.MkdirAllFailIfExist(txnDir); err != nil { - return nil, fmt.Errorf("cannot create %q: %w", txnDir, err) - } + partNames := mustReadPartNames(path) - tmpDir := path + "/tmp" - fs.MustRemoveDirAtomic(tmpDir) - if err := fs.MkdirAllFailIfExist(tmpDir); err != nil { - return nil, fmt.Errorf("cannot create %q: %w", tmpDir, err) - } - - fs.MustSyncPath(path) - - // Open parts. + // Remove dirs missing in partNames. These dirs may be left after unclean shutdown + // or after the update from versions prior to v1.90.0. des, err := os.ReadDir(path) if err != nil { - return nil, fmt.Errorf("cannot read directory: %w", err) + return nil, fmt.Errorf("cannot read mergetree table dir: %w", err) + } + m := make(map[string]struct{}, len(partNames)) + for _, partName := range partNames { + m[partName] = struct{}{} } - var pws []*partWrapper for _, de := range des { if !fs.IsDirOrSymlink(de) { // Skip non-directories. continue } fn := de.Name() - if isSpecialDir(fn) { - // Skip special dirs. - continue - } - partPath := path + "/" + fn - if fs.IsEmptyDir(partPath) { - // Remove empty directory, which can be left after unclean shutdown on NFS. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142 - fs.MustRemoveDirAtomic(partPath) - continue + if _, ok := m[fn]; !ok { + deletePath := path + "/" + fn + fs.MustRemoveAll(deletePath) } + } + fs.MustSyncPath(path) + + // Open parts + var pws []*partWrapper + for _, partName := range partNames { + partPath := path + "/" + partName p, err := openFilePart(partPath) if err != nil { mustCloseParts(pws) @@ -1482,8 +1437,7 @@ func mustCloseParts(pws []*partWrapper) { // CreateSnapshotAt creates tb snapshot in the given dstDir. // -// Snapshot is created using linux hard links, so it is usually created -// very quickly. +// Snapshot is created using linux hard links, so it is usually created very quickly. // // If deadline is reached before snapshot is created error is returned. // @@ -1509,36 +1463,27 @@ func (tb *Table) CreateSnapshotAt(dstDir string, deadline uint64) error { // Flush inmemory items to disk. tb.flushInmemoryItems() - // The snapshot must be created under the lock in order to prevent from - // concurrent modifications via runTransaction. - tb.snapshotLock.Lock() - defer tb.snapshotLock.Unlock() - if err := fs.MkdirAllFailIfExist(dstDir); err != nil { return fmt.Errorf("cannot create snapshot dir %q: %w", dstDir, err) } - des, err := os.ReadDir(srcDir) - if err != nil { - return fmt.Errorf("cannot read directory: %w", err) - } + pws := tb.getParts(nil) + defer tb.putParts(pws) - for _, de := range des { + // Create a file with part names at dstDir + mustWritePartNames(pws, dstDir) + + // Make hardlinks for pws at dstDir + for _, pw := range pws { + if pw.mp != nil { + // Skip in-memory parts + continue + } if deadline > 0 && fasttime.UnixTimestamp() > deadline { return fmt.Errorf("cannot create snapshot for %q: timeout exceeded", tb.path) } - - fn := de.Name() - if !fs.IsDirOrSymlink(de) { - // Skip non-directories. - continue - } - if isSpecialDir(fn) { - // Skip special dirs. - continue - } - srcPartPath := srcDir + "/" + fn - dstPartPath := dstDir + "/" + fn + srcPartPath := pw.p.path + dstPartPath := dstDir + "/" + filepath.Base(srcPartPath) if err := fs.HardLinkFiles(srcPartPath, dstPartPath); err != nil { return fmt.Errorf("cannot create hard links from %q to %q: %w", srcPartPath, dstPartPath, err) } @@ -1552,129 +1497,60 @@ func (tb *Table) CreateSnapshotAt(dstDir string, deadline uint64) error { return nil } -func runTransactions(txnLock *sync.RWMutex, path string) error { - // Wait until all the previous pending transaction deletions are finished. - pendingTxnDeletionsWG.Wait() - - // Make sure all the current transaction deletions are finished before exiting. - defer pendingTxnDeletionsWG.Wait() - - txnDir := path + "/txn" - des, err := os.ReadDir(txnDir) - if err != nil { - if os.IsNotExist(err) { - return nil - } - return fmt.Errorf("cannot read transaction dir: %w", err) - } - - // Sort transaction files by id, since transactions must be ordered. - sort.Slice(des, func(i, j int) bool { - return des[i].Name() < des[j].Name() - }) - - for _, de := range des { - fn := de.Name() - if fs.IsTemporaryFileName(fn) { - // Skip temporary files, which could be left after unclean shutdown. +func mustWritePartNames(pws []*partWrapper, dstDir string) { + partNames := make([]string, 0, len(pws)) + for _, pw := range pws { + if pw.mp != nil { + // Skip in-memory parts continue } - txnPath := txnDir + "/" + fn - if err := runTransaction(txnLock, path, txnPath); err != nil { - return fmt.Errorf("cannot run transaction from %q: %w", txnPath, err) - } + partName := filepath.Base(pw.p.path) + partNames = append(partNames, partName) + } + sort.Strings(partNames) + data, err := json.Marshal(partNames) + if err != nil { + logger.Panicf("BUG: cannot marshal partNames to JSON: %s", err) + } + partNamesPath := dstDir + "/parts.json" + if err := fs.WriteFileAtomically(partNamesPath, data, true); err != nil { + logger.Panicf("FATAL: cannot update %s: %s", partNamesPath, err) } - return nil } -func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error { - // The transaction must run under read lock in order to provide - // consistent snapshots with Table.CreateSnapshot(). - txnLock.RLock() - defer txnLock.RUnlock() - - data, err := os.ReadFile(txnPath) - if err != nil { - return fmt.Errorf("cannot read transaction file: %w", err) - } - if len(data) > 0 && data[len(data)-1] == '\n' { - data = data[:len(data)-1] - } - paths := strings.Split(string(data), "\n") - - if len(paths) == 0 { - return fmt.Errorf("empty transaction") - } - rmPaths := paths[:len(paths)-1] - mvPaths := strings.Split(paths[len(paths)-1], " -> ") - if len(mvPaths) != 2 { - return fmt.Errorf("invalid last line in the transaction file: got %q; must contain `srcPath -> dstPath`", paths[len(paths)-1]) - } - - // Remove old paths. It is OK if certain paths don't exist. - for _, path := range rmPaths { - path, err := validatePath(pathPrefix, path) - if err != nil { - return fmt.Errorf("invalid path to remove: %w", err) +func mustReadPartNames(srcDir string) []string { + partNamesPath := srcDir + "/parts.json" + data, err := os.ReadFile(partNamesPath) + if err == nil { + var partNames []string + if err := json.Unmarshal(data, &partNames); err != nil { + logger.Panicf("FATAL: cannot parse %s: %s", partNamesPath, err) } - fs.MustRemoveDirAtomic(path) + return partNames } - - // Move the new part to new directory. - srcPath := mvPaths[0] - dstPath := mvPaths[1] - srcPath, err = validatePath(pathPrefix, srcPath) + if !os.IsNotExist(err) { + logger.Panicf("FATAL: cannot read parts.json file: %s", err) + } + // The parts.json is missing. This is the upgrade from versions previous to v1.90.0. + // Read part names from directories under srcDir + des, err := os.ReadDir(srcDir) if err != nil { - return fmt.Errorf("invalid source path to rename: %w", err) + logger.Panicf("FATAL: cannot read mergeset table dir: %s", err) } - dstPath, err = validatePath(pathPrefix, dstPath) - if err != nil { - return fmt.Errorf("invalid destination path to rename: %w", err) - } - if fs.IsPathExist(srcPath) { - if err := os.Rename(srcPath, dstPath); err != nil { - return fmt.Errorf("cannot rename %q to %q: %w", srcPath, dstPath, err) + var partNames []string + for _, de := range des { + if !fs.IsDirOrSymlink(de) { + // Skip non-directories. + continue } - } else if !fs.IsPathExist(dstPath) { - // Emit info message for the expected condition after unclean shutdown on NFS disk. - // The dstPath part may be missing because it could be already merged into bigger part - // while old source parts for the current txn weren't still deleted due to NFS locks. - logger.Infof("cannot find both source and destination paths: %q -> %q; this may be the case after unclean shutdown (OOM, `kill -9`, hard reset) on NFS disk", - srcPath, dstPath) - } - - // Flush pathPrefix directory metadata to the underlying storage. - fs.MustSyncPath(pathPrefix) - - pendingTxnDeletionsWG.Add(1) - go func() { - defer pendingTxnDeletionsWG.Done() - if err := os.Remove(txnPath); err != nil { - logger.Errorf("cannot remove transaction file %q: %s", txnPath, err) + partName := de.Name() + if isSpecialDir(partName) { + // Skip special dirs. + continue } - }() - - return nil -} - -var pendingTxnDeletionsWG syncwg.WaitGroup - -func validatePath(pathPrefix, path string) (string, error) { - var err error - - pathPrefix, err = filepath.Abs(pathPrefix) - if err != nil { - return path, fmt.Errorf("cannot determine absolute path for pathPrefix=%q: %w", pathPrefix, err) + partNames = append(partNames, partName) } - - path, err = filepath.Abs(path) - if err != nil { - return path, fmt.Errorf("cannot determine absolute path for %q: %w", path, err) - } - if !strings.HasPrefix(path, pathPrefix+"/") { - return path, fmt.Errorf("invalid path %q; must start with %q", path, pathPrefix+"/") - } - return path, nil + return partNames } // getPartsToMerge returns optimal parts to merge from pws. diff --git a/lib/storage/block_stream_reader.go b/lib/storage/block_stream_reader.go index 2f9e6fc59..e08817f28 100644 --- a/lib/storage/block_stream_reader.go +++ b/lib/storage/block_stream_reader.go @@ -141,7 +141,7 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error { path = filepath.Clean(path) - if err := bsr.ph.ParseFromPath(path); err != nil { + if err := bsr.ph.ReadMetadata(path); err != nil { return fmt.Errorf("cannot parse path to part: %w", err) } diff --git a/lib/storage/inmemory_part.go b/lib/storage/inmemory_part.go index 70f05c15f..3886795de 100644 --- a/lib/storage/inmemory_part.go +++ b/lib/storage/inmemory_part.go @@ -56,8 +56,8 @@ func (mp *inmemoryPart) StoreToDisk(path string) error { if err := fs.WriteFileAndSync(metaindexPath, mp.metaindexData.B); err != nil { return fmt.Errorf("cannot store metaindex: %w", err) } - if err := mp.ph.writeMinDedupInterval(path); err != nil { - return fmt.Errorf("cannot store min dedup interval: %w", err) + if err := mp.ph.WriteMetadata(path); err != nil { + return fmt.Errorf("cannot store metadata: %w", err) } // Sync parent directory in order to make sure the written files remain visible after hardware reset parentDirPath := filepath.Dir(path) diff --git a/lib/storage/part.go b/lib/storage/part.go index 956ab7bae..2deccbe48 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -50,7 +50,7 @@ func openFilePart(path string) (*part, error) { path = filepath.Clean(path) var ph partHeader - if err := ph.ParseFromPath(path); err != nil { + if err := ph.ReadMetadata(path); err != nil { return nil, fmt.Errorf("cannot parse path to part: %w", err) } diff --git a/lib/storage/part_header.go b/lib/storage/part_header.go index 8896dacd8..ee61724fb 100644 --- a/lib/storage/part_header.go +++ b/lib/storage/part_header.go @@ -1,6 +1,7 @@ package storage import ( + "encoding/json" "errors" "fmt" "os" @@ -10,6 +11,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" ) @@ -33,12 +35,35 @@ type partHeader struct { // String returns string representation of ph. func (ph *partHeader) String() string { - return fmt.Sprintf("%d_%d_%s_%s", ph.RowsCount, ph.BlocksCount, toUserReadableTimestamp(ph.MinTimestamp), toUserReadableTimestamp(ph.MaxTimestamp)) + return fmt.Sprintf("partHeader{rowsCount=%d,blocksCount=%d,minTimestamp=%d,maxTimestamp=%d}", ph.RowsCount, ph.BlocksCount, ph.MinTimestamp, ph.MaxTimestamp) } -func toUserReadableTimestamp(timestamp int64) string { - t := timestampToTime(timestamp) - return t.Format(userReadableTimeFormat) +// Reset resets the ph. +func (ph *partHeader) Reset() { + ph.RowsCount = 0 + ph.BlocksCount = 0 + ph.MinTimestamp = (1 << 63) - 1 + ph.MaxTimestamp = -1 << 63 + ph.MinDedupInterval = 0 +} + +func (ph *partHeader) readMinDedupInterval(partPath string) error { + filePath := partPath + "/min_dedup_interval" + data, err := os.ReadFile(filePath) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + // The minimum dedup interval may not exist for old parts. + ph.MinDedupInterval = 0 + return nil + } + return fmt.Errorf("cannot read %q: %w", filePath, err) + } + dedupInterval, err := promutils.ParseDuration(string(data)) + if err != nil { + return fmt.Errorf("cannot parse minimum dedup interval %q at %q: %w", data, filePath, err) + } + ph.MinDedupInterval = dedupInterval.Milliseconds() + return nil } func fromUserReadableTimestamp(s string) (int64, error) { @@ -51,15 +76,6 @@ func fromUserReadableTimestamp(s string) (int64, error) { const userReadableTimeFormat = "20060102150405.000" -// Path returns a path to part header with the given prefix and suffix. -// -// Suffix must be random. -func (ph *partHeader) Path(prefix string, suffix uint64) string { - prefix = filepath.Clean(prefix) - s := ph.String() - return fmt.Sprintf("%s/%s_%016X", prefix, s, suffix) -} - // ParseFromPath extracts ph info from the given path. func (ph *partHeader) ParseFromPath(path string) error { ph.Reset() @@ -119,40 +135,48 @@ func (ph *partHeader) ParseFromPath(path string) error { return nil } -// Reset resets the ph. -func (ph *partHeader) Reset() { - ph.RowsCount = 0 - ph.BlocksCount = 0 - ph.MinTimestamp = (1 << 63) - 1 - ph.MaxTimestamp = -1 << 63 - ph.MinDedupInterval = 0 -} +func (ph *partHeader) ReadMetadata(partPath string) error { + ph.Reset() -func (ph *partHeader) readMinDedupInterval(partPath string) error { - filePath := partPath + "/min_dedup_interval" - data, err := os.ReadFile(filePath) + metadataPath := partPath + "/metadata.json" + metadata, err := os.ReadFile(metadataPath) if err != nil { - if errors.Is(err, os.ErrNotExist) { - // The minimum dedup interval may not exist for old parts. - ph.MinDedupInterval = 0 - return nil + if os.IsNotExist(err) { + // This is a part created before v1.90.0. + // Fall back to reading the metadata from the partPath itsel + return ph.ParseFromPath(partPath) } - return fmt.Errorf("cannot read %q: %w", filePath, err) + return fmt.Errorf("cannot read %q: %w", metadataPath, err) } - dedupInterval, err := promutils.ParseDuration(string(data)) - if err != nil { - return fmt.Errorf("cannot parse minimum dedup interval %q at %q: %w", data, filePath, err) + if err := json.Unmarshal(metadata, ph); err != nil { + return fmt.Errorf("cannot parse %q: %w", metadataPath, err) } - ph.MinDedupInterval = dedupInterval.Milliseconds() + + // Perform various checks + if ph.MinTimestamp > ph.MaxTimestamp { + return fmt.Errorf("minTimestamp cannot exceed maxTimestamp; got %d vs %d", ph.MinTimestamp, ph.MaxTimestamp) + } + if ph.RowsCount <= 0 { + return fmt.Errorf("rowsCount must be greater than 0; got %d", ph.RowsCount) + } + if ph.BlocksCount <= 0 { + return fmt.Errorf("blocksCount must be greater than 0; got %d", ph.BlocksCount) + } + if ph.BlocksCount > ph.RowsCount { + return fmt.Errorf("blocksCount cannot be bigger than rowsCount; got blocksCount=%d, rowsCount=%d", ph.BlocksCount, ph.RowsCount) + } + return nil } -func (ph *partHeader) writeMinDedupInterval(partPath string) error { - filePath := partPath + "/min_dedup_interval" - dedupInterval := time.Duration(ph.MinDedupInterval) * time.Millisecond - data := dedupInterval.String() - if err := fs.WriteFileAtomically(filePath, []byte(data), false); err != nil { - return fmt.Errorf("cannot create %q: %w", filePath, err) +func (ph *partHeader) WriteMetadata(partPath string) error { + metadata, err := json.Marshal(ph) + if err != nil { + logger.Panicf("BUG: cannot marshal partHeader metadata: %s", err) + } + metadataPath := partPath + "/metadata.json" + if err := fs.WriteFileAtomically(metadataPath, metadata, false); err != nil { + return fmt.Errorf("cannot create %q: %w", metadataPath, err) } return nil } diff --git a/lib/storage/part_header_test.go b/lib/storage/part_header_test.go deleted file mode 100644 index a4c20c0b1..000000000 --- a/lib/storage/part_header_test.go +++ /dev/null @@ -1,62 +0,0 @@ -package storage - -import ( - "testing" -) - -func TestPartHeaderParseFromPath(t *testing.T) { - testParseFromPathError := func(path string) { - t.Helper() - - var ph partHeader - if err := ph.ParseFromPath(path); err == nil { - t.Fatalf("expecting non-nil error") - } - } - - t.Run("Error", func(t *testing.T) { - testParseFromPathError("") - testParseFromPathError("foobar") - testParseFromPathError("/foo/bar") - testParseFromPathError("/rowscount_mintimestamp_maxtimestamp_garbage") - testParseFromPathError("/rowscount_mintimestamp_maxtimestamp_garbage") - testParseFromPathError("/12_3456_mintimestamp_maxtimestamp_garbage") - testParseFromPathError("/12_3456_20181011010203.456_maxtimestamp_garbage") - testParseFromPathError("/12_3456_20181011010203.456_20181011010202.456_garbage") - testParseFromPathError("12_3456_20181011010203.456_20181011010203.457_garbage") - testParseFromPathError("12_3456_20181011010203.456_20181011010203.457_garbage/") - - // MinTimestamp > MaxTimetamp - testParseFromPathError("1233_456_20181011010203.456_20181011010202.457_garbage") - - // Zero rowsCount - testParseFromPathError("0_123_20181011010203.456_20181011010203.457_garbage") - - // Zero blocksCount - testParseFromPathError("123_0_20181011010203.456_20181011010203.457_garbage") - - // blocksCount > rowsCount - testParseFromPathError("123_456_20181011010203.456_20181011010203.457_garbage") - }) - - testParseFromPathSuccess := func(path string, phStringExpected string) { - t.Helper() - - var ph partHeader - if err := ph.ParseFromPath(path); err != nil { - t.Fatalf("unexpected error when parsing path %q: %s", path, err) - } - phString := ph.String() - if phString != phStringExpected { - t.Fatalf("unexpected partHeader string for path %q: got %q; want %q", path, phString, phStringExpected) - } - } - - t.Run("Success", func(t *testing.T) { - testParseFromPathSuccess("/1233_456_20181011010203.456_20181011010203.457_garbage", "1233_456_20181011010203.456_20181011010203.457") - testParseFromPathSuccess("/1233_456_20181011010203.456_20181011010203.457_garbage/", "1233_456_20181011010203.456_20181011010203.457") - testParseFromPathSuccess("/1233_456_20181011010203.456_20181011010203.457_garbage///", "1233_456_20181011010203.456_20181011010203.457") - testParseFromPathSuccess("/var/lib/tsdb/1233_456_20181011010203.456_20181011010203.457_garbage///", "1233_456_20181011010203.456_20181011010203.457") - testParseFromPathSuccess("/var/lib/tsdb/456_456_20181011010203.456_20181011010203.457_232345///", "456_456_20181011010203.456_20181011010203.457") - }) -} diff --git a/lib/storage/partition.go b/lib/storage/partition.go index f41381300..04e35cb06 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1,6 +1,7 @@ package storage import ( + "encoding/json" "errors" "fmt" "os" @@ -12,7 +13,6 @@ import ( "time" "unsafe" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -20,7 +20,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" ) // The maximum size of big part. @@ -158,8 +157,6 @@ type partition struct { // which may need to be merged. needMergeCh chan struct{} - snapshotLock sync.RWMutex - stopCh chan struct{} wg sync.WaitGroup @@ -167,11 +164,11 @@ type partition struct { // partWrapper is a wrapper for the part. type partWrapper struct { - // Put atomic counters to the top of struct, so they are aligned to 8 bytes on 32-bit arch. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 - // The number of references to the part. - refCount uint64 + refCount uint32 + + // The flag, which is set when the part must be deleted after refCount reaches zero. + mustBeDeleted uint32 // The part itself. p *part @@ -187,24 +184,32 @@ type partWrapper struct { } func (pw *partWrapper) incRef() { - atomic.AddUint64(&pw.refCount, 1) + atomic.AddUint32(&pw.refCount, 1) } func (pw *partWrapper) decRef() { - n := atomic.AddUint64(&pw.refCount, ^uint64(0)) - if int64(n) < 0 { - logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", int64(n)) + n := atomic.AddUint32(&pw.refCount, ^uint32(0)) + if int32(n) < 0 { + logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", int32(n)) } if n > 0 { return } + deletePath := "" + if pw.mp == nil && atomic.LoadUint32(&pw.mustBeDeleted) != 0 { + deletePath = pw.p.path + } if pw.mp != nil { putInmemoryPart(pw.mp) pw.mp = nil } pw.p.MustClose() pw.p = nil + + if deletePath != "" { + fs.MustRemoveAll(deletePath) + } } // createPartition creates new partition for the given timestamp and the given paths @@ -215,11 +220,11 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name logger.Infof("creating a partition %q with smallPartsPath=%q, bigPartsPath=%q", name, smallPartsPath, bigPartsPath) - if err := createPartitionDirs(smallPartsPath); err != nil { - return nil, fmt.Errorf("cannot create directories for small parts %q: %w", smallPartsPath, err) + if err := fs.MkdirAllFailIfExist(smallPartsPath); err != nil { + return nil, fmt.Errorf("cannot create directory for small parts %q: %w", smallPartsPath, err) } - if err := createPartitionDirs(bigPartsPath); err != nil { - return nil, fmt.Errorf("cannot create directories for big parts %q: %w", bigPartsPath, err) + if err := fs.MkdirAllFailIfExist(bigPartsPath); err != nil { + return nil, fmt.Errorf("cannot create directory for big parts %q: %w", bigPartsPath, err) } pt := newPartition(name, smallPartsPath, bigPartsPath, s) @@ -243,8 +248,6 @@ func (pt *partition) startBackgroundWorkers() { // The pt must be detached from table before calling pt.Drop. func (pt *partition) Drop() { logger.Infof("dropping partition %q at smallPartsPath=%q, bigPartsPath=%q", pt.name, pt.smallPartsPath, pt.bigPartsPath) - // Wait until all the pending transaction deletions are finished before removing partition directories. - pendingTxnDeletionsWG.Wait() fs.MustRemoveDirAtomic(pt.smallPartsPath) fs.MustRemoveDirAtomic(pt.bigPartsPath) @@ -266,11 +269,13 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition, return nil, fmt.Errorf("patititon name in bigPartsPath %q doesn't match smallPartsPath %q; want %q", bigPartsPath, smallPartsPath, name) } - smallParts, err := openParts(smallPartsPath, bigPartsPath, smallPartsPath) + partNamesSmall, partNamesBig := mustReadPartNames(smallPartsPath, bigPartsPath) + + smallParts, err := openParts(smallPartsPath, partNamesSmall) if err != nil { return nil, fmt.Errorf("cannot open small parts from %q: %w", smallPartsPath, err) } - bigParts, err := openParts(smallPartsPath, bigPartsPath, bigPartsPath) + bigParts, err := openParts(bigPartsPath, partNamesBig) if err != nil { mustCloseParts(smallParts) return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err) @@ -375,21 +380,21 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { m.InmemoryRowsCount += p.ph.RowsCount m.InmemoryBlocksCount += p.ph.BlocksCount m.InmemorySizeBytes += p.size - m.InmemoryPartsRefCount += atomic.LoadUint64(&pw.refCount) + m.InmemoryPartsRefCount += uint64(atomic.LoadUint32(&pw.refCount)) } for _, pw := range pt.smallParts { p := pw.p m.SmallRowsCount += p.ph.RowsCount m.SmallBlocksCount += p.ph.BlocksCount m.SmallSizeBytes += p.size - m.SmallPartsRefCount += atomic.LoadUint64(&pw.refCount) + m.SmallPartsRefCount += uint64(atomic.LoadUint32(&pw.refCount)) } for _, pw := range pt.bigParts { p := pw.p m.BigRowsCount += p.ph.RowsCount m.BigBlocksCount += p.ph.BlocksCount m.BigSizeBytes += p.size - m.BigPartsRefCount += atomic.LoadUint64(&pw.refCount) + m.BigPartsRefCount += uint64(atomic.LoadUint32(&pw.refCount)) } m.InmemoryPartsCount += uint64(len(pt.inmemoryParts)) @@ -751,19 +756,12 @@ func (pt *partition) HasTimestamp(timestamp int64) bool { func (pt *partition) GetParts(dst []*partWrapper, addInMemory bool) []*partWrapper { pt.partsLock.Lock() if addInMemory { - for _, pw := range pt.inmemoryParts { - pw.incRef() - } + incRefForParts(pt.inmemoryParts) dst = append(dst, pt.inmemoryParts...) } - - for _, pw := range pt.smallParts { - pw.incRef() - } + incRefForParts(pt.smallParts) dst = append(dst, pt.smallParts...) - for _, pw := range pt.bigParts { - pw.incRef() - } + incRefForParts(pt.bigParts) dst = append(dst, pt.bigParts...) pt.partsLock.Unlock() @@ -777,15 +775,18 @@ func (pt *partition) PutParts(pws []*partWrapper) { } } +func incRefForParts(pws []*partWrapper) { + for _, pw := range pws { + pw.incRef() + } +} + // MustClose closes the pt, so the app may safely exit. // // The pt must be detached from table before calling pt.MustClose. func (pt *partition) MustClose() { close(pt.stopCh) - // Wait until all the pending transaction deletions are finished. - pendingTxnDeletionsWG.Wait() - logger.Infof("waiting for service workers to stop on %q...", pt.smallPartsPath) startTime := time.Now() pt.wg.Wait() @@ -1277,7 +1278,8 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi // Initialize destination paths. dstPartType := pt.getDstPartType(pws, isFinal) - ptPath, tmpPartPath, mergeIdx := pt.getDstPartPaths(dstPartType) + mergeIdx := pt.nextMergeIdx() + dstPartPath := pt.getDstPartPath(dstPartType, mergeIdx) if dstPartType == partBig { bigMergeWorkersLimitCh <- struct{}{} @@ -1289,17 +1291,10 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi if !isDedupEnabled() && isFinal && len(pws) == 1 && pws[0].mp != nil { // Fast path: flush a single in-memory part to disk. mp := pws[0].mp - if tmpPartPath == "" { - logger.Panicf("BUG: tmpPartPath must be non-empty") - } - - if err := mp.StoreToDisk(tmpPartPath); err != nil { - return fmt.Errorf("cannot store in-memory part to %q: %w", tmpPartPath, err) - } - pwNew, err := pt.openCreatedPart(&mp.ph, pws, nil, ptPath, tmpPartPath, mergeIdx) - if err != nil { - return fmt.Errorf("cannot atomically register the created part: %w", err) + if err := mp.StoreToDisk(dstPartPath); err != nil { + logger.Panicf("FATAL: cannot store in-memory part to %s: %s", dstPartPath, err) } + pwNew := pt.openCreatedPart(&mp.ph, pws, nil, dstPartPath) pt.swapSrcWithDstParts(pws, pwNew, dstPartType) return nil } @@ -1307,7 +1302,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi // Prepare BlockStreamReaders for source parts. bsrs, err := openBlockStreamReaders(pws) if err != nil { - return err + logger.Panicf("FATAL: cannot open source parts for merging: %s", err) } closeBlockStreamReaders := func() { for _, bsr := range bsrs { @@ -1333,45 +1328,38 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi mpNew = getInmemoryPart() bsw.InitFromInmemoryPart(mpNew, compressLevel) } else { - if tmpPartPath == "" { - logger.Panicf("BUG: tmpPartPath must be non-empty") + if dstPartPath == "" { + logger.Panicf("BUG: dstPartPath must be non-empty") } nocache := dstPartType == partBig - if err := bsw.InitFromFilePart(tmpPartPath, nocache, compressLevel); err != nil { - closeBlockStreamReaders() - return fmt.Errorf("cannot create destination part at %q: %w", tmpPartPath, err) + if err := bsw.InitFromFilePart(dstPartPath, nocache, compressLevel); err != nil { + logger.Panicf("FATAL: cannot create destination part at %s: %s", dstPartPath, err) } } // Merge source parts to destination part. - ph, err := pt.mergePartsInternal(tmpPartPath, bsw, bsrs, dstPartType, stopCh) + ph, err := pt.mergePartsInternal(dstPartPath, bsw, bsrs, dstPartType, stopCh) putBlockStreamWriter(bsw) closeBlockStreamReaders() if err != nil { - return fmt.Errorf("cannot merge %d parts: %w", len(pws), err) + return err } if mpNew != nil { // Update partHeader for destination inmemory part after the merge. mpNew.ph = *ph } - // Atomically move the created part from tmpPartPath to its destination - // and swap the source parts with the newly created part. - pwNew, err := pt.openCreatedPart(ph, pws, mpNew, ptPath, tmpPartPath, mergeIdx) - if err != nil { - return fmt.Errorf("cannot atomically register the created part: %w", err) - } + // Atomically swap the source parts with the newly created part. + pwNew := pt.openCreatedPart(ph, pws, mpNew, dstPartPath) dstRowsCount := uint64(0) dstBlocksCount := uint64(0) dstSize := uint64(0) - dstPartPath := "" if pwNew != nil { pDst := pwNew.p dstRowsCount = pDst.ph.RowsCount dstBlocksCount = pDst.ph.BlocksCount dstSize = pDst.size - dstPartPath = pDst.String() } pt.swapSrcWithDstParts(pws, pwNew, dstPartType) @@ -1424,7 +1412,7 @@ func (pt *partition) getDstPartType(pws []*partWrapper, isFinal bool) partType { return partInmemory } -func (pt *partition) getDstPartPaths(dstPartType partType) (string, string, uint64) { +func (pt *partition) getDstPartPath(dstPartType partType, mergeIdx uint64) string { ptPath := "" switch dstPartType { case partSmall: @@ -1436,13 +1424,11 @@ func (pt *partition) getDstPartPaths(dstPartType partType) (string, string, uint default: logger.Panicf("BUG: unknown partType=%d", dstPartType) } - ptPath = filepath.Clean(ptPath) - mergeIdx := pt.nextMergeIdx() - tmpPartPath := "" + dstPartPath := "" if dstPartType != partInmemory { - tmpPartPath = fmt.Sprintf("%s/tmp/%016X", ptPath, mergeIdx) + dstPartPath = fmt.Sprintf("%s/%016X", ptPath, mergeIdx) } - return ptPath, tmpPartPath, mergeIdx + return dstPartPath } func openBlockStreamReaders(pws []*partWrapper) ([]*blockStreamReader, error) { @@ -1464,7 +1450,7 @@ func openBlockStreamReaders(pws []*partWrapper) ([]*blockStreamReader, error) { return bsrs, nil } -func (pt *partition) mergePartsInternal(tmpPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) { +func (pt *partition) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) { var ph partHeader var rowsMerged *uint64 var rowsDeleted *uint64 @@ -1495,64 +1481,42 @@ func (pt *partition) mergePartsInternal(tmpPartPath string, bsw *blockStreamWrit atomic.AddUint64(activeMerges, ^uint64(0)) atomic.AddUint64(mergesCount, 1) if err != nil { - return nil, fmt.Errorf("cannot merge parts to %q: %w", tmpPartPath, err) + return nil, fmt.Errorf("cannot merge %d parts to %s: %w", len(bsrs), dstPartPath, err) } - if tmpPartPath != "" { + if dstPartPath != "" { ph.MinDedupInterval = GetDedupInterval() - if err := ph.writeMinDedupInterval(tmpPartPath); err != nil { - return nil, fmt.Errorf("cannot store min dedup interval: %w", err) + if err := ph.WriteMetadata(dstPartPath); err != nil { + logger.Panicf("FATAL: cannot store metadata to %s: %s", dstPartPath, err) } } return &ph, nil } -func (pt *partition) openCreatedPart(ph *partHeader, pws []*partWrapper, mpNew *inmemoryPart, ptPath, tmpPartPath string, mergeIdx uint64) (*partWrapper, error) { - dstPartPath := "" - if mpNew == nil || !areAllInmemoryParts(pws) { - // Either source or destination parts are located on disk. - // Create a transaction for atomic deleting of old parts and moving new part to its destination on disk. - var bb bytesutil.ByteBuffer - for _, pw := range pws { - if pw.mp == nil { - fmt.Fprintf(&bb, "%s\n", pw.p.path) - } - } - if ph.RowsCount > 0 { - // The destination part may have no rows if they are deleted during the merge. - dstPartPath = ph.Path(ptPath, mergeIdx) - } - fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath) - txnPath := fmt.Sprintf("%s/txn/%016X", ptPath, mergeIdx) - if err := fs.WriteFileAtomically(txnPath, bb.B, false); err != nil { - return nil, fmt.Errorf("cannot create transaction file %q: %w", txnPath, err) - } - - // Run the created transaction. - if err := runTransaction(&pt.snapshotLock, pt.smallPartsPath, pt.bigPartsPath, txnPath); err != nil { - return nil, fmt.Errorf("cannot execute transaction %q: %w", txnPath, err) - } - } +func (pt *partition) openCreatedPart(ph *partHeader, pws []*partWrapper, mpNew *inmemoryPart, dstPartPath string) *partWrapper { // Open the created part. if ph.RowsCount == 0 { - // The created part is empty. - return nil, nil + // The created part is empty. Remove it + if mpNew == nil { + fs.MustRemoveAll(dstPartPath) + } + return nil } if mpNew != nil { // Open the created part from memory. flushToDiskDeadline := getFlushToDiskDeadline(pws) pwNew := newPartWrapperFromInmemoryPart(mpNew, flushToDiskDeadline) - return pwNew, nil + return pwNew } // Open the created part from disk. pNew, err := openFilePart(dstPartPath) if err != nil { - return nil, fmt.Errorf("cannot open merged part %q: %w", dstPartPath, err) + logger.Panicf("FATAL: cannot open merged part %s: %s", dstPartPath, err) } pwNew := &partWrapper{ p: pNew, refCount: 1, } - return pwNew, nil + return pwNew } func areAllInmemoryParts(pws []*partWrapper) bool { @@ -1578,6 +1542,7 @@ func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, removedBigParts := 0 pt.partsLock.Lock() + pt.inmemoryParts, removedInmemoryParts = removeParts(pt.inmemoryParts, m) pt.smallParts, removedSmallParts = removeParts(pt.smallParts, m) pt.bigParts, removedBigParts = removeParts(pt.bigParts, m) @@ -1594,6 +1559,14 @@ func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, } pt.notifyBackgroundMergers() } + + // Atomically store the updated list of file-based parts on disk. + // This must be performed under partsLock in order to prevent from races + // when multiple concurrently running goroutines update the list. + if removedSmallParts > 0 || removedBigParts > 0 || pwNew != nil && (dstPartType == partSmall || dstPartType == partBig) { + mustWritePartNames(pt.smallParts, pt.bigParts, pt.smallPartsPath) + } + pt.partsLock.Unlock() removedParts := removedInmemoryParts + removedSmallParts + removedBigParts @@ -1601,8 +1574,10 @@ func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, logger.Panicf("BUG: unexpected number of parts removed; got %d, want %d", removedParts, len(m)) } - // Remove partition references from old parts. + // Mark old parts as must be deleted and decrement reference count, + // so they are eventually closed and deleted. for _, pw := range pws { + atomic.StoreUint32(&pw.mustBeDeleted, 1) pw.decRef() } } @@ -1672,62 +1647,35 @@ func (pt *partition) stalePartsRemover() { } func (pt *partition) removeStaleParts() { - m := make(map[*partWrapper]bool) startTime := time.Now() retentionDeadline := timestampFromTime(startTime) - pt.s.retentionMsecs + var pws []*partWrapper pt.partsLock.Lock() for _, pw := range pt.inmemoryParts { if !pw.isInMerge && pw.p.ph.MaxTimestamp < retentionDeadline { atomic.AddUint64(&pt.inmemoryRowsDeleted, pw.p.ph.RowsCount) - m[pw] = true + pw.isInMerge = true + pws = append(pws, pw) } } for _, pw := range pt.smallParts { if !pw.isInMerge && pw.p.ph.MaxTimestamp < retentionDeadline { atomic.AddUint64(&pt.smallRowsDeleted, pw.p.ph.RowsCount) - m[pw] = true + pw.isInMerge = true + pws = append(pws, pw) } } for _, pw := range pt.bigParts { if !pw.isInMerge && pw.p.ph.MaxTimestamp < retentionDeadline { atomic.AddUint64(&pt.bigRowsDeleted, pw.p.ph.RowsCount) - m[pw] = true + pw.isInMerge = true + pws = append(pws, pw) } } - removedInmemoryParts := 0 - removedSmallParts := 0 - removedBigParts := 0 - if len(m) > 0 { - pt.inmemoryParts, removedInmemoryParts = removeParts(pt.inmemoryParts, m) - pt.smallParts, removedSmallParts = removeParts(pt.smallParts, m) - pt.bigParts, removedBigParts = removeParts(pt.bigParts, m) - } pt.partsLock.Unlock() - removedParts := removedInmemoryParts + removedSmallParts + removedBigParts - if removedParts != len(m) { - logger.Panicf("BUG: unexpected number of stale parts removed; got %d, want %d", removedParts, len(m)) - } - - // Physically remove stale parts under snapshotLock in order to provide - // consistent snapshots with table.CreateSnapshot(). - pt.snapshotLock.RLock() - for pw := range m { - if pw.mp == nil { - logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, pt.s.retentionMsecs/1000) - fs.MustRemoveDirAtomic(pw.p.path) - } - } - // There is no need in calling fs.MustSyncPath() on pt.smallPartsPath and pt.bigPartsPath, - // since they should be automatically called inside fs.MustRemoveDirAtomic(). - pt.snapshotLock.RUnlock() - - // Remove partition references from removed parts. - for pw := range m { - pw.decRef() - } - + pt.swapSrcWithDstParts(pws, nil, partSmall) } // getPartsToMerge returns optimal parts to merge from pws. @@ -1868,63 +1816,50 @@ func getPartsSize(pws []*partWrapper) uint64 { return n } -func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) { +func openParts(path string, partNames []string) ([]*partWrapper, error) { // The path can be missing after restoring from backup, so create it if needed. if err := fs.MkdirAllIfNotExist(path); err != nil { return nil, err } fs.MustRemoveTemporaryDirs(path) - // Run remaining transactions and cleanup /txn and /tmp directories. - // Snapshots cannot be created yet, so use fakeSnapshotLock. - var fakeSnapshotLock sync.RWMutex - if err := runTransactions(&fakeSnapshotLock, pathPrefix1, pathPrefix2, path); err != nil { - return nil, fmt.Errorf("cannot run transactions from %q: %w", path, err) - } + // Remove txn and tmp directories, which may be left after the upgrade + // to v1.90.0 and newer versions. + fs.MustRemoveAll(path + "/txn") + fs.MustRemoveAll(path + "/tmp") - txnDir := path + "/txn" - fs.MustRemoveDirAtomic(txnDir) - tmpDir := path + "/tmp" - fs.MustRemoveDirAtomic(tmpDir) - if err := createPartitionDirs(path); err != nil { - return nil, fmt.Errorf("cannot create directories for partition %q: %w", path, err) - } - - // Open parts. + // Remove dirs missing in partNames. These dirs may be left after unclean shutdown + // or after the update from versions prior to v1.90.0. des, err := os.ReadDir(path) if err != nil { - return nil, fmt.Errorf("cannot read partition directory: %w", err) + return nil, fmt.Errorf("cannot read partition dir: %w", err) + } + m := make(map[string]struct{}, len(partNames)) + for _, partName := range partNames { + m[partName] = struct{}{} } - var pws []*partWrapper for _, de := range des { if !fs.IsDirOrSymlink(de) { // Skip non-directories. continue } fn := de.Name() - if fn == "snapshots" { - // "snapshots" dir is skipped for backwards compatibility. Now it is unused. - continue + if _, ok := m[fn]; !ok { + deletePath := path + "/" + fn + fs.MustRemoveAll(deletePath) } - if fn == "tmp" || fn == "txn" { - // Skip special dirs. - continue - } - partPath := path + "/" + fn - if fs.IsEmptyDir(partPath) { - // Remove empty directory, which can be left after unclean shutdown on NFS. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142 - fs.MustRemoveDirAtomic(partPath) - continue - } - startTime := time.Now() + } + fs.MustSyncPath(path) + + // Open parts + var pws []*partWrapper + for _, partName := range partNames { + partPath := path + "/" + partName p, err := openFilePart(partPath) if err != nil { mustCloseParts(pws) return nil, fmt.Errorf("cannot open part %q: %w", partPath, err) } - logger.Infof("opened part %q in %.3f seconds", partPath, time.Since(startTime).Seconds()) - pw := &partWrapper{ p: p, refCount: 1, @@ -1946,8 +1881,7 @@ func mustCloseParts(pws []*partWrapper) { // CreateSnapshotAt creates pt snapshot at the given smallPath and bigPath dirs. // -// Snapshot is created using linux hard links, so it is usually created -// very quickly. +// Snapshot is created using linux hard links, so it is usually created very quickly. func (pt *partition) CreateSnapshotAt(smallPath, bigPath string) error { logger.Infof("creating partition snapshot of %q and %q...", pt.smallPartsPath, pt.bigPartsPath) startTime := time.Now() @@ -1955,15 +1889,32 @@ func (pt *partition) CreateSnapshotAt(smallPath, bigPath string) error { // Flush inmemory data to disk. pt.flushInmemoryRows() - // The snapshot must be created under the lock in order to prevent from - // concurrent modifications via runTransaction. - pt.snapshotLock.Lock() - defer pt.snapshotLock.Unlock() + pt.partsLock.Lock() + incRefForParts(pt.smallParts) + pwsSmall := append([]*partWrapper{}, pt.smallParts...) + incRefForParts(pt.bigParts) + pwsBig := append([]*partWrapper{}, pt.bigParts...) + pt.partsLock.Unlock() - if err := pt.createSnapshot(pt.smallPartsPath, smallPath); err != nil { + defer func() { + pt.PutParts(pwsSmall) + pt.PutParts(pwsBig) + }() + + if err := fs.MkdirAllFailIfExist(smallPath); err != nil { + return fmt.Errorf("cannot create snapshot dir %q: %w", smallPath, err) + } + if err := fs.MkdirAllFailIfExist(bigPath); err != nil { + return fmt.Errorf("cannot create snapshot dir %q: %w", bigPath, err) + } + + // Create a file with part names at smallPath + mustWritePartNames(pwsSmall, pwsBig, smallPath) + + if err := pt.createSnapshot(pt.smallPartsPath, smallPath, pwsSmall); err != nil { return fmt.Errorf("cannot create snapshot for %q: %w", pt.smallPartsPath, err) } - if err := pt.createSnapshot(pt.bigPartsPath, bigPath); err != nil { + if err := pt.createSnapshot(pt.bigPartsPath, bigPath, pwsBig); err != nil { return fmt.Errorf("cannot create snapshot for %q: %w", pt.bigPartsPath, err) } @@ -1975,202 +1926,115 @@ func (pt *partition) CreateSnapshotAt(smallPath, bigPath string) error { // createSnapshot creates a snapshot from srcDir to dstDir. // // The caller is responsible for deleting dstDir if createSnapshot() returns error. -func (pt *partition) createSnapshot(srcDir, dstDir string) error { - if err := fs.MkdirAllFailIfExist(dstDir); err != nil { - return fmt.Errorf("cannot create snapshot dir %q: %w", dstDir, err) - } - - des, err := os.ReadDir(srcDir) - if err != nil { - return fmt.Errorf("cannot read partition directory: %w", err) - } - for _, de := range des { - fn := de.Name() - if !fs.IsDirOrSymlink(de) { - if fn == "appliedRetention.txt" { - // Copy the appliedRetention.txt file to dstDir. - // This file can be created by VictoriaMetrics enterprise. - // See https://docs.victoriametrics.com/#retention-filters . - // Do not make hard link to this file, since it can be modified over time. - srcPath := srcDir + "/" + fn - dstPath := dstDir + "/" + fn - if err := fs.CopyFile(srcPath, dstPath); err != nil { - return fmt.Errorf("cannot copy %q to %q: %w", srcPath, dstPath, err) - } - } - // Skip non-directories. - continue - } - if fn == "tmp" || fn == "txn" || fs.IsScheduledForRemoval(fn) { - // Skip special dirs. - continue - } - srcPartPath := srcDir + "/" + fn - dstPartPath := dstDir + "/" + fn +func (pt *partition) createSnapshot(srcDir, dstDir string, pws []*partWrapper) error { + // Make hardlinks for pws at dstDir + for _, pw := range pws { + srcPartPath := pw.p.path + dstPartPath := dstDir + "/" + filepath.Base(srcPartPath) if err := fs.HardLinkFiles(srcPartPath, dstPartPath); err != nil { return fmt.Errorf("cannot create hard links from %q to %q: %w", srcPartPath, dstPartPath, err) } } + // Copy the appliedRetention.txt file to dstDir. + // This file can be created by VictoriaMetrics enterprise. + // See https://docs.victoriametrics.com/#retention-filters . + // Do not make hard link to this file, since it can be modified over time. + srcPath := srcDir + "/appliedRetention.txt" + if fs.IsPathExist(srcPath) { + dstPath := dstDir + "/" + filepath.Base(srcPath) + if err := fs.CopyFile(srcPath, dstPath); err != nil { + return fmt.Errorf("cannot copy %q to %q: %w", srcPath, dstPath, err) + } + } + fs.MustSyncPath(dstDir) - fs.MustSyncPath(filepath.Dir(dstDir)) + parentDir := filepath.Dir(dstDir) + fs.MustSyncPath(parentDir) return nil } -func runTransactions(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, path string) error { - // Wait until all the previous pending transaction deletions are finished. - pendingTxnDeletionsWG.Wait() +type partNamesJSON struct { + Small []string + Big []string +} - // Make sure all the current transaction deletions are finished before exiting. - defer pendingTxnDeletionsWG.Wait() +func mustWritePartNames(pwsSmall, pwsBig []*partWrapper, dstDir string) { + partNamesSmall := getPartNames(pwsSmall) + partNamesBig := getPartNames(pwsBig) + partNames := &partNamesJSON{ + Small: partNamesSmall, + Big: partNamesBig, + } + data, err := json.Marshal(partNames) + if err != nil { + logger.Panicf("BUG: cannot marshal partNames to JSON: %s", err) + } + partNamesPath := dstDir + "/parts.json" + if err := fs.WriteFileAtomically(partNamesPath, data, true); err != nil { + logger.Panicf("FATAL: cannot update %s: %s", partNamesPath, err) + } +} - txnDir := path + "/txn" - des, err := os.ReadDir(txnDir) +func getPartNames(pws []*partWrapper) []string { + partNames := make([]string, 0, len(pws)) + for _, pw := range pws { + if pw.mp != nil { + // Skip in-memory parts + continue + } + partName := filepath.Base(pw.p.path) + partNames = append(partNames, partName) + } + sort.Strings(partNames) + return partNames +} + +func mustReadPartNames(smallPartsPath, bigPartsPath string) ([]string, []string) { + partNamesPath := smallPartsPath + "/parts.json" + data, err := os.ReadFile(partNamesPath) + if err == nil { + var partNames partNamesJSON + if err := json.Unmarshal(data, &partNames); err != nil { + logger.Panicf("FATAL: cannot parse %s: %s", partNamesPath, err) + } + return partNames.Small, partNames.Big + } + if !os.IsNotExist(err) { + logger.Panicf("FATAL: cannot read parts.json file: %s", err) + } + // The parts.json is missing. This is the upgrade from versions previous to v1.90.0. + // Read part names from smallPartsPath and bigPartsPath directories + partNamesSmall := mustReadPartNamesFromDir(smallPartsPath) + partNamesBig := mustReadPartNamesFromDir(bigPartsPath) + return partNamesSmall, partNamesBig +} + +func mustReadPartNamesFromDir(srcDir string) []string { + des, err := os.ReadDir(srcDir) if err != nil { if os.IsNotExist(err) { return nil } - return fmt.Errorf("cannot read transaction directory: %w", err) + logger.Panicf("FATAL: cannot read partition dir: %s", err) } - - // Sort transaction files by id. - sort.Slice(des, func(i, j int) bool { - return des[i].Name() < des[j].Name() - }) - + var partNames []string for _, de := range des { - fn := de.Name() - if fs.IsTemporaryFileName(fn) { - // Skip temporary files, which could be left after unclean shutdown. + if !fs.IsDirOrSymlink(de) { + // Skip non-directories. continue } - txnPath := txnDir + "/" + fn - if err := runTransaction(txnLock, pathPrefix1, pathPrefix2, txnPath); err != nil { - return fmt.Errorf("cannot run transaction from %q: %w", txnPath, err) + partName := de.Name() + if isSpecialDir(partName) { + // Skip special dirs. + continue } + partNames = append(partNames, partName) } - return nil + return partNames } -func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath string) error { - // The transaction must run under read lock in order to provide - // consistent snapshots with partition.CreateSnapshot(). - txnLock.RLock() - defer txnLock.RUnlock() - - data, err := os.ReadFile(txnPath) - if err != nil { - return fmt.Errorf("cannot read transaction file: %w", err) - } - if len(data) > 0 && data[len(data)-1] == '\n' { - data = data[:len(data)-1] - } - paths := strings.Split(string(data), "\n") - - if len(paths) == 0 { - return fmt.Errorf("empty transaction") - } - rmPaths := paths[:len(paths)-1] - mvPaths := strings.Split(paths[len(paths)-1], " -> ") - if len(mvPaths) != 2 { - return fmt.Errorf("invalid last line in the transaction file: got %q; must contain `srcPath -> dstPath`", paths[len(paths)-1]) - } - - // Remove old paths. It is OK if certain paths don't exist. - for _, path := range rmPaths { - path, err := validatePath(pathPrefix1, pathPrefix2, path) - if err != nil { - return fmt.Errorf("invalid path to remove: %w", err) - } - fs.MustRemoveDirAtomic(path) - } - - // Move the new part to new directory. - srcPath := mvPaths[0] - dstPath := mvPaths[1] - if len(srcPath) > 0 { - srcPath, err = validatePath(pathPrefix1, pathPrefix2, srcPath) - if err != nil { - return fmt.Errorf("invalid source path to rename: %w", err) - } - if len(dstPath) > 0 { - // Move srcPath to dstPath. - dstPath, err = validatePath(pathPrefix1, pathPrefix2, dstPath) - if err != nil { - return fmt.Errorf("invalid destination path to rename: %w", err) - } - if fs.IsPathExist(srcPath) { - if err := os.Rename(srcPath, dstPath); err != nil { - return fmt.Errorf("cannot rename %q to %q: %w", srcPath, dstPath, err) - } - } else if !fs.IsPathExist(dstPath) { - // Emit info message for the expected condition after unclean shutdown on NFS disk. - // The dstPath part may be missing because it could be already merged into bigger part - // while old source parts for the current txn weren't still deleted due to NFS locks. - logger.Infof("cannot find both source and destination paths: %q -> %q; this may be the case after unclean shutdown "+ - "(OOM, `kill -9`, hard reset) on NFS disk", srcPath, dstPath) - } - } else { - // Just remove srcPath. - fs.MustRemoveDirAtomic(srcPath) - } - } - - // Flush pathPrefix* directory metadata to the underlying storage, - // so the moved files become visible there. - fs.MustSyncPath(pathPrefix1) - fs.MustSyncPath(pathPrefix2) - - pendingTxnDeletionsWG.Add(1) - go func() { - defer pendingTxnDeletionsWG.Done() - - // There is no need in calling fs.MustSyncPath for pathPrefix* after parts' removal, - // since it is already called by fs.MustRemoveDirAtomic. - - if err := os.Remove(txnPath); err != nil { - logger.Errorf("cannot remove transaction file %q: %s", txnPath, err) - } - }() - - return nil -} - -var pendingTxnDeletionsWG syncwg.WaitGroup - -func validatePath(pathPrefix1, pathPrefix2, path string) (string, error) { - var err error - - pathPrefix1, err = filepath.Abs(pathPrefix1) - if err != nil { - return path, fmt.Errorf("cannot determine absolute path for pathPrefix1=%q: %w", pathPrefix1, err) - } - pathPrefix2, err = filepath.Abs(pathPrefix2) - if err != nil { - return path, fmt.Errorf("cannot determine absolute path for pathPrefix2=%q: %w", pathPrefix2, err) - } - - path, err = filepath.Abs(path) - if err != nil { - return path, fmt.Errorf("cannot determine absolute path for %q: %w", path, err) - } - if !strings.HasPrefix(path, pathPrefix1+"/") && !strings.HasPrefix(path, pathPrefix2+"/") { - return path, fmt.Errorf("invalid path %q; must start with either %q or %q", path, pathPrefix1+"/", pathPrefix2+"/") - } - return path, nil -} - -func createPartitionDirs(path string) error { - path = filepath.Clean(path) - txnPath := path + "/txn" - if err := fs.MkdirAllFailIfExist(txnPath); err != nil { - return fmt.Errorf("cannot create txn directory %q: %w", txnPath, err) - } - tmpPath := path + "/tmp" - if err := fs.MkdirAllFailIfExist(tmpPath); err != nil { - return fmt.Errorf("cannot create tmp directory %q: %w", tmpPath, err) - } - fs.MustSyncPath(path) - return nil +func isSpecialDir(name string) bool { + return name == "tmp" || name == "txn" || name == "snapshots" || fs.IsScheduledForRemoval(name) } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index bfbebfc42..d020ae9f9 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -160,8 +160,8 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1447 for details. if fs.IsPathExist(s.cachePath + "/reset_cache_on_startup") { logger.Infof("removing cache directory at %q, since it contains `reset_cache_on_startup` file...", s.cachePath) - // Do not use fs.MustRemoveDirAtomic() here, since the cache directory may be mounted - // to a separate filesystem. In this case the fs.MustRemoveDirAtomic() will fail while + // Do not use fs.MustRemoveAll() here, since the cache directory may be mounted + // to a separate filesystem. In this case the fs.MustRemoveAll() will fail while // trying to remove the mount root. fs.RemoveDirContents(s.cachePath) logger.Infof("cache directory at %q has been successfully removed", s.cachePath) @@ -2377,7 +2377,7 @@ func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error for _, tn := range tableNames[:len(tableNames)-2] { pathToRemove := path + "/" + tn logger.Infof("removing obsolete indexdb dir %q...", pathToRemove) - fs.MustRemoveDirAtomic(pathToRemove) + fs.MustRemoveAll(pathToRemove) logger.Infof("removed obsolete indexdb dir %q", pathToRemove) }