mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-01 15:33:35 +00:00
Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files
This commit is contained in:
commit
99aeb3b21b
31 changed files with 274 additions and 198 deletions
10
Makefile
10
Makefile
|
@ -141,6 +141,8 @@ vmutils-windows-amd64: \
|
|||
vmagent-windows-amd64 \
|
||||
vmalert-windows-amd64 \
|
||||
vmauth-windows-amd64 \
|
||||
vmbackup-windows-amd64 \
|
||||
vmrestore-windows-amd64 \
|
||||
vmctl-windows-amd64
|
||||
|
||||
victoria-metrics-crossbuild: \
|
||||
|
@ -309,23 +311,31 @@ release-vmutils-windows-goarch: \
|
|||
vmagent-windows-$(GOARCH)-prod \
|
||||
vmalert-windows-$(GOARCH)-prod \
|
||||
vmauth-windows-$(GOARCH)-prod \
|
||||
vmbackup-windows-$(GOARCH)-prod \
|
||||
vmrestore-windows-$(GOARCH)-prod \
|
||||
vmctl-windows-$(GOARCH)-prod
|
||||
cd bin && \
|
||||
zip vmutils-windows-$(GOARCH)-$(PKG_TAG).zip \
|
||||
vmagent-windows-$(GOARCH)-prod.exe \
|
||||
vmalert-windows-$(GOARCH)-prod.exe \
|
||||
vmauth-windows-$(GOARCH)-prod.exe \
|
||||
vmbackup-windows-$(GOARCH)-prod.exe \
|
||||
vmrestore-windows-$(GOARCH)-prod.exe \
|
||||
vmctl-windows-$(GOARCH)-prod.exe \
|
||||
&& sha256sum vmutils-windows-$(GOARCH)-$(PKG_TAG).zip \
|
||||
vmagent-windows-$(GOARCH)-prod.exe \
|
||||
vmalert-windows-$(GOARCH)-prod.exe \
|
||||
vmauth-windows-$(GOARCH)-prod.exe \
|
||||
vmbackup-windows-$(GOARCH)-prod.exe \
|
||||
vmrestore-windows-$(GOARCH)-prod.exe \
|
||||
vmctl-windows-$(GOARCH)-prod.exe \
|
||||
> vmutils-windows-$(GOARCH)-$(PKG_TAG)_checksums.txt
|
||||
cd bin && rm -rf \
|
||||
vmagent-windows-$(GOARCH)-prod.exe \
|
||||
vmalert-windows-$(GOARCH)-prod.exe \
|
||||
vmauth-windows-$(GOARCH)-prod.exe \
|
||||
vmbackup-windows-$(GOARCH)-prod.exe \
|
||||
vmrestore-windows-$(GOARCH)-prod.exe \
|
||||
vmctl-windows-$(GOARCH)-prod.exe
|
||||
|
||||
pprof-cpu:
|
||||
|
|
|
@ -39,6 +39,9 @@ vmbackup-freebsd-amd64-prod:
|
|||
vmbackup-openbsd-amd64-prod:
|
||||
APP_NAME=vmbackup $(MAKE) app-via-docker-openbsd-amd64
|
||||
|
||||
vmbackup-windows-amd64-prod:
|
||||
APP_NAME=vmbackup $(MAKE) app-via-docker-windows-amd64
|
||||
|
||||
package-vmbackup:
|
||||
APP_NAME=vmbackup $(MAKE) package-via-docker
|
||||
|
||||
|
@ -93,5 +96,8 @@ vmbackup-freebsd-amd64:
|
|||
vmbackup-openbsd-amd64:
|
||||
APP_NAME=vmbackup CGO_ENABLED=0 GOOS=openbsd GOARCH=amd64 $(MAKE) app-local-goos-goarch
|
||||
|
||||
vmbackup-windows-amd64:
|
||||
GOARCH=amd64 APP_NAME=vmbackup $(MAKE) app-local-windows-goarch
|
||||
|
||||
vmbackup-pure:
|
||||
APP_NAME=vmbackup $(MAKE) app-local-pure
|
||||
|
|
|
@ -39,6 +39,9 @@ vmrestore-freebsd-amd64-prod:
|
|||
vmrestore-openbsd-amd64-prod:
|
||||
APP_NAME=vmrestore $(MAKE) app-via-docker-openbsd-amd64
|
||||
|
||||
vmrestore-windows-amd64-prod:
|
||||
APP_NAME=vmrestore $(MAKE) app-via-docker-windows-amd64
|
||||
|
||||
package-vmrestore:
|
||||
APP_NAME=vmrestore $(MAKE) package-via-docker
|
||||
|
||||
|
@ -93,5 +96,8 @@ vmrestore-freebsd-amd64:
|
|||
vmrestore-openbsd-amd64:
|
||||
APP_NAME=vmrestore CGO_ENABLED=0 GOOS=openbsd GOARCH=amd64 $(MAKE) app-local-goos-goarch
|
||||
|
||||
vmrestore-windows-amd64:
|
||||
GOARCH=amd64 APP_NAME=vmrestore $(MAKE) app-local-windows-goarch
|
||||
|
||||
vmrestore-pure:
|
||||
APP_NAME=vmrestore $(MAKE) app-local-pure
|
||||
|
|
|
@ -29,7 +29,7 @@ VictoriaMetrics is production-ready for the following operating systems:
|
|||
* OpenBSD
|
||||
* Solaris/SmartOS
|
||||
|
||||
Some VictoriaMetrics components ([vmagent](https://docs.victoriametrics.com/vmagent.html), [vmalert](https://docs.victoriametrics.com/vmalert.html) and [vmauth](https://docs.victoriametrics.com/vmauth.html)) can run on Windows.
|
||||
There is an experimental support of VictoriaMetrics components for Windows.
|
||||
|
||||
VictoriaMetrics can run also on MacOS for testing and development purposes.
|
||||
|
||||
|
@ -40,7 +40,7 @@ VictoriaMetrics can run also on MacOS for testing and development purposes.
|
|||
* **OpenBSD**: i386, amd64, arm
|
||||
* **Solaris/SmartOS**: i386, amd64
|
||||
* **MacOS**: amd64, arm64 (for testing and development purposes)
|
||||
* **Windows**: amd64 (supported by [vmagent](https://docs.victoriametrics.com/vmagent.html), [vmalert](https://docs.victoriametrics.com/vmalert.html) and [vmauth](https://docs.victoriametrics.com/vmauth.html)).
|
||||
* **Windows**: amd64
|
||||
|
||||
## Upgrade procedure
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
so the previous versions of VictoriaMetrics will exit with the `unexpected number of substrings in the part name` error when trying to run them on the data
|
||||
created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or newer releases**
|
||||
|
||||
* FEATURE: publish VictoriaMetrics binaries for Windows. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3236), [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3821) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70) issues.
|
||||
* FEATURE: release Windows binaries for [single-node VictoriaMetrics](https://docs.victoriametrics.com/), [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html), [vmbackup](https://docs.victoriametrics.com/vmbackup.html) and [vmrestore](https://docs.victoriametrics.com/vmrestore.html). See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3236), [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3821) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70) issues.
|
||||
* FEATURE: log metrics with truncated labels if the length of label value in the ingested metric exceeds `-maxLabelValueLen`. This should simplify debugging for this case.
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): show target URL when debugging [target relabeling](https://docs.victoriametrics.com/vmagent.html#relabel-debug). This should simplify target relabel debugging a bit. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3882).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [VictoriaMetrics remote write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol) when [sending / receiving data to / from Kafka](https://docs.victoriametrics.com/vmagent.html#kafka-integration). This protocol allows saving egress network bandwidth costs when sending data from `vmagent` to `Kafka` located in another datacenter or availability zone. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1225).
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/backupnames"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal"
|
||||
|
@ -209,7 +210,7 @@ func (sw *statWriter) Write(p []byte) (int, error) {
|
|||
}
|
||||
|
||||
func createRestoreLock(dstDir string) error {
|
||||
lockF := path.Join(dstDir, "restore-in-progress")
|
||||
lockF := path.Join(dstDir, backupnames.RestoreInProgressFilename)
|
||||
f, err := os.Create(lockF)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create restore lock file %q: %w", lockF, err)
|
||||
|
@ -218,7 +219,7 @@ func createRestoreLock(dstDir string) error {
|
|||
}
|
||||
|
||||
func removeRestoreLock(dstDir string) error {
|
||||
lockF := path.Join(dstDir, "restore-in-progress")
|
||||
lockF := path.Join(dstDir, backupnames.RestoreInProgressFilename)
|
||||
if err := os.Remove(lockF); err != nil {
|
||||
return fmt.Errorf("cannote remove restore lock file %q: %w", lockF, err)
|
||||
}
|
||||
|
|
9
lib/backup/backupnames/filenames.go
Normal file
9
lib/backup/backupnames/filenames.go
Normal file
|
@ -0,0 +1,9 @@
|
|||
package backupnames
|
||||
|
||||
const (
|
||||
// RestoreInProgressFilename is the filename for "restore in progress" file
|
||||
//
|
||||
// This file is created at the beginning of the restore process and is deleted at the end of the restore process.
|
||||
// If this file exists, then it is unsafe to read the storage data, since it can be incomplete.
|
||||
RestoreInProgressFilename = "restore-in-progress"
|
||||
)
|
|
@ -6,6 +6,7 @@ import (
|
|||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/backupnames"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
|
@ -135,7 +136,7 @@ func appendFilesInternal(dst []string, d *os.File) ([]string, error) {
|
|||
}
|
||||
|
||||
func isSpecialFile(name string) bool {
|
||||
return name == "flock.lock" || name == "restore-in-progress"
|
||||
return name == "flock.lock" || name == backupnames.RestoreInProgressFilename
|
||||
}
|
||||
|
||||
// RemoveEmptyDirs recursively removes empty directories under the given dir.
|
||||
|
@ -249,7 +250,7 @@ func removeEmptyDirsInternal(d *os.File) (bool, error) {
|
|||
if dirEntries > 0 {
|
||||
return false, nil
|
||||
}
|
||||
// Use os.RemoveAll() instead of os.Remove(), since the dir may contain special files such as flock.lock and restore-in-progress,
|
||||
// Use os.RemoveAll() instead of os.Remove(), since the dir may contain special files such as flock.lock and backupnames.RestoreInProgressFilename,
|
||||
// which must be ignored.
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
return false, fmt.Errorf("cannot remove %q: %w", dir, err)
|
||||
|
|
|
@ -1,6 +1,17 @@
|
|||
package filestream
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"golang.org/x/sys/windows"
|
||||
)
|
||||
|
||||
func (st *streamTracker) adviseDontNeed(n int, fdatasync bool) error {
|
||||
if fdatasync && st.fd > 0 {
|
||||
if err := windows.FlushFileBuffers(windows.Handle(st.fd)); err != nil {
|
||||
return fmt.Errorf("windows.Fsync error: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,8 @@ package fs
|
|||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"sync"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
|
@ -10,27 +12,20 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
kernelDLL = windows.MustLoadDLL("kernel32.dll")
|
||||
procLock = kernelDLL.MustFindProc("LockFileEx")
|
||||
procEvent = kernelDLL.MustFindProc("CreateEventW")
|
||||
procDisk = kernelDLL.MustFindProc("GetDiskFreeSpaceExW")
|
||||
ntDLL = windows.MustLoadDLL("ntdll.dll")
|
||||
ntSetInformationProc = ntDLL.MustFindProc("NtSetInformationFile")
|
||||
kernelDLL = windows.MustLoadDLL("kernel32.dll")
|
||||
procLock = kernelDLL.MustFindProc("LockFileEx")
|
||||
procEvent = kernelDLL.MustFindProc("CreateEventW")
|
||||
procDisk = kernelDLL.MustFindProc("GetDiskFreeSpaceExW")
|
||||
)
|
||||
|
||||
// panic at windows, if file already open by another process.
|
||||
// one of possible solutions - change files opening process with correct flags.
|
||||
// https://github.com/dgraph-io/badger/issues/699
|
||||
// https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-flushfilebuffers
|
||||
func mustSyncPath(string) {
|
||||
// at windows only files could be synced
|
||||
// Sync for directories is not supported.
|
||||
func mustSyncPath(path string) {
|
||||
}
|
||||
|
||||
const (
|
||||
lockfileExclusiveLock = 2
|
||||
fileFlagNormal = 0x00000080
|
||||
// https://docs.microsoft.com/en-us/windows-hardware/drivers/ddi/ntddk/ns-ntddk-_file_disposition_information_ex
|
||||
fileDispositionPosixSemantics = 0x00000002
|
||||
fileDispositionIgnoreReadonlyAttribute = 0x00000010
|
||||
)
|
||||
|
||||
// https://github.com/juju/fslock/blob/master/fslock_windows.go
|
||||
|
@ -55,7 +50,6 @@ func createFlockFile(flockFile string) (*os.File, error) {
|
|||
return nil, fmt.Errorf("cannot create Overlapped handler: %w", err)
|
||||
}
|
||||
// https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-lockfileex
|
||||
// overlapped is dropped?
|
||||
r1, _, err := procLock.Call(uintptr(handle), uintptr(lockfileExclusiveLock), uintptr(0), uintptr(1), uintptr(0), uintptr(unsafe.Pointer(ol)))
|
||||
if r1 == 0 {
|
||||
return nil, err
|
||||
|
@ -63,14 +57,58 @@ func createFlockFile(flockFile string) (*os.File, error) {
|
|||
return os.NewFile(uintptr(handle), flockFile), nil
|
||||
}
|
||||
|
||||
// stub
|
||||
var (
|
||||
mmapByAddrLock sync.Mutex
|
||||
mmapByAddr = map[uintptr]windows.Handle{}
|
||||
)
|
||||
|
||||
func mmap(fd int, length int) ([]byte, error) {
|
||||
return nil, nil
|
||||
flProtect := uint32(windows.PAGE_READONLY)
|
||||
dwDesiredAccess := uint32(windows.FILE_MAP_READ)
|
||||
// https://learn.microsoft.com/en-us/windows/win32/memory/creating-a-file-mapping-object#file-mapping-size
|
||||
// do not specify any length params, windows will set it according to the file size.
|
||||
// If length > file size, truncate is required according to api definition, we don't want it.
|
||||
h, errno := windows.CreateFileMapping(windows.Handle(fd), nil, flProtect, 0, 0, nil)
|
||||
if h == 0 {
|
||||
return nil, os.NewSyscallError("CreateFileMapping", errno)
|
||||
}
|
||||
addr, errno := windows.MapViewOfFile(h, dwDesiredAccess, 0, 0, 0)
|
||||
if addr == 0 {
|
||||
windows.CloseHandle(h)
|
||||
return nil, os.NewSyscallError("MapViewOfFile", errno)
|
||||
}
|
||||
data := make([]byte, 0)
|
||||
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&data))
|
||||
hdr.Data = addr
|
||||
hdr.Len = length
|
||||
hdr.Cap = hdr.Len
|
||||
|
||||
mmapByAddrLock.Lock()
|
||||
mmapByAddr[addr] = h
|
||||
mmapByAddrLock.Unlock()
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// stub
|
||||
func mUnmap([]byte) error {
|
||||
return nil
|
||||
func mUnmap(data []byte) error {
|
||||
// flush is not needed, since we perform only reading operation.
|
||||
// In case of write, additional call FlushViewOfFile must be performed.
|
||||
header := (*reflect.SliceHeader)(unsafe.Pointer(&data))
|
||||
addr := header.Data
|
||||
|
||||
mmapByAddrLock.Lock()
|
||||
h, ok := mmapByAddr[addr]
|
||||
if !ok {
|
||||
logger.Fatalf("BUG: unmapping for non exist addr: %d", addr)
|
||||
}
|
||||
delete(mmapByAddr, addr)
|
||||
mmapByAddrLock.Unlock()
|
||||
|
||||
if err := windows.UnmapViewOfFile(addr); err != nil {
|
||||
return fmt.Errorf("cannot unmap memory mapped file: %w", err)
|
||||
}
|
||||
errno := windows.CloseHandle(h)
|
||||
return os.NewSyscallError("CloseHandle", errno)
|
||||
}
|
||||
|
||||
func mustGetFreeSpace(path string) uint64 {
|
||||
|
@ -78,8 +116,7 @@ func mustGetFreeSpace(path string) uint64 {
|
|||
r, _, err := procDisk.Call(uintptr(unsafe.Pointer(windows.StringToUTF16Ptr(path))),
|
||||
uintptr(unsafe.Pointer(&freeBytes)))
|
||||
if r == 0 {
|
||||
logger.Errorf("cannot get free space: %v", err)
|
||||
return 0
|
||||
logger.Panicf("FATAL: cannot get free space for %q : %s", path, err)
|
||||
}
|
||||
return uint64(freeBytes)
|
||||
}
|
||||
|
@ -109,38 +146,3 @@ func createEvent(sa *windows.SecurityAttributes, name *uint16) (windows.Handle,
|
|||
}
|
||||
return handle, nil
|
||||
}
|
||||
|
||||
// https://docs.microsoft.com/en-us/windows-hardware/drivers/ddi/ntddk/ns-ntddk-_file_disposition_information_ex
|
||||
type fileDispositionInformationEx struct {
|
||||
Flags uint32
|
||||
}
|
||||
|
||||
// https://docs.microsoft.com/en-us/windows-hardware/drivers/ddi/wdm/ns-wdm-_io_status_block
|
||||
type ioStatusBlock struct {
|
||||
Status, Information uintptr
|
||||
}
|
||||
|
||||
// UpdateFileHandle - changes file deletion semantic at windows to posix-like.
|
||||
func UpdateFileHandle(path string) error {
|
||||
handle, err := windows.Open(path, windows.GENERIC_READ|windows.DELETE, windows.FILE_SHARE_READ|windows.FILE_SHARE_DELETE)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return setPosixDelete(handle)
|
||||
}
|
||||
|
||||
// supported starting with Windows 10, version 1709.
|
||||
// supported by NTFS only.
|
||||
func setPosixDelete(handle windows.Handle) error {
|
||||
var iosb ioStatusBlock
|
||||
flags := fileDispositionInformationEx{
|
||||
Flags: fileDispositionPosixSemantics | fileDispositionIgnoreReadonlyAttribute,
|
||||
}
|
||||
// class FileDispositionInformationEx, // 64
|
||||
// https://docs.microsoft.com/en-us/windows-hardware/drivers/ddi/wdm/ne-wdm-_file_information_class
|
||||
r0, _, err := ntSetInformationProc.Call(uintptr(handle), uintptr(unsafe.Pointer(&iosb)), uintptr(unsafe.Pointer(&flags)), unsafe.Sizeof(flags), uintptr(64))
|
||||
if r0 == 0 {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("cannot set file disposition information: NT_STATUS: 0x%X, error: %w", r0, err)
|
||||
}
|
||||
|
|
|
@ -147,7 +147,7 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error {
|
|||
return fmt.Errorf("cannot read metadata from %q: %w", path, err)
|
||||
}
|
||||
|
||||
metaindexPath := path + "/metaindex.bin"
|
||||
metaindexPath := filepath.Join(path, metaindexFilename)
|
||||
metaindexFile, err := filestream.Open(metaindexPath, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open metaindex file in stream mode: %w", err)
|
||||
|
@ -158,20 +158,20 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error {
|
|||
return fmt.Errorf("cannot unmarshal metaindex rows from file %q: %w", metaindexPath, err)
|
||||
}
|
||||
|
||||
indexPath := path + "/index.bin"
|
||||
indexPath := filepath.Join(path, indexFilename)
|
||||
indexFile, err := filestream.Open(indexPath, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open index file in stream mode: %w", err)
|
||||
}
|
||||
|
||||
itemsPath := path + "/items.bin"
|
||||
itemsPath := filepath.Join(path, itemsFilename)
|
||||
itemsFile, err := filestream.Open(itemsPath, true)
|
||||
if err != nil {
|
||||
indexFile.MustClose()
|
||||
return fmt.Errorf("cannot open items file in stream mode: %w", err)
|
||||
}
|
||||
|
||||
lensPath := path + "/lens.bin"
|
||||
lensPath := filepath.Join(path, lensFilename)
|
||||
lensFile, err := filestream.Open(lensPath, true)
|
||||
if err != nil {
|
||||
indexFile.MustClose()
|
||||
|
|
|
@ -88,14 +88,14 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
|||
|
||||
// Always cache metaindex file in OS page cache, since it is immediately
|
||||
// read after the merge.
|
||||
metaindexPath := path + "/metaindex.bin"
|
||||
metaindexPath := filepath.Join(path, metaindexFilename)
|
||||
metaindexFile, err := filestream.Create(metaindexPath, false)
|
||||
if err != nil {
|
||||
fs.MustRemoveDirAtomic(path)
|
||||
return fmt.Errorf("cannot create metaindex file: %w", err)
|
||||
}
|
||||
|
||||
indexPath := path + "/index.bin"
|
||||
indexPath := filepath.Join(path, indexFilename)
|
||||
indexFile, err := filestream.Create(indexPath, nocache)
|
||||
if err != nil {
|
||||
metaindexFile.MustClose()
|
||||
|
@ -103,7 +103,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
|||
return fmt.Errorf("cannot create index file: %w", err)
|
||||
}
|
||||
|
||||
itemsPath := path + "/items.bin"
|
||||
itemsPath := filepath.Join(path, itemsFilename)
|
||||
itemsFile, err := filestream.Create(itemsPath, nocache)
|
||||
if err != nil {
|
||||
metaindexFile.MustClose()
|
||||
|
@ -112,7 +112,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
|||
return fmt.Errorf("cannot create items file: %w", err)
|
||||
}
|
||||
|
||||
lensPath := path + "/lens.bin"
|
||||
lensPath := filepath.Join(path, lensFilename)
|
||||
lensFile, err := filestream.Create(lensPath, nocache)
|
||||
if err != nil {
|
||||
metaindexFile.MustClose()
|
||||
|
|
10
lib/mergeset/filenames.go
Normal file
10
lib/mergeset/filenames.go
Normal file
|
@ -0,0 +1,10 @@
|
|||
package mergeset
|
||||
|
||||
const (
|
||||
metaindexFilename = "metaindex.bin"
|
||||
indexFilename = "index.bin"
|
||||
itemsFilename = "items.bin"
|
||||
lensFilename = "lens.bin"
|
||||
metadataFilename = "metadata.json"
|
||||
partsFilename = "parts.json"
|
||||
)
|
|
@ -37,19 +37,19 @@ func (mp *inmemoryPart) StoreToDisk(path string) error {
|
|||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||
return fmt.Errorf("cannot create directory %q: %w", path, err)
|
||||
}
|
||||
metaindexPath := path + "/metaindex.bin"
|
||||
metaindexPath := filepath.Join(path, metaindexFilename)
|
||||
if err := fs.WriteFileAndSync(metaindexPath, mp.metaindexData.B); err != nil {
|
||||
return fmt.Errorf("cannot store metaindex: %w", err)
|
||||
}
|
||||
indexPath := path + "/index.bin"
|
||||
indexPath := filepath.Join(path, indexFilename)
|
||||
if err := fs.WriteFileAndSync(indexPath, mp.indexData.B); err != nil {
|
||||
return fmt.Errorf("cannot store index: %w", err)
|
||||
}
|
||||
itemsPath := path + "/items.bin"
|
||||
itemsPath := filepath.Join(path, itemsFilename)
|
||||
if err := fs.WriteFileAndSync(itemsPath, mp.itemsData.B); err != nil {
|
||||
return fmt.Errorf("cannot store items: %w", err)
|
||||
}
|
||||
lensPath := path + "/lens.bin"
|
||||
lensPath := filepath.Join(path, lensFilename)
|
||||
if err := fs.WriteFileAndSync(lensPath, mp.lensData.B); err != nil {
|
||||
return fmt.Errorf("cannot store lens: %w", err)
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package mergeset
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"unsafe"
|
||||
|
||||
|
@ -72,22 +73,22 @@ func openFilePart(path string) (*part, error) {
|
|||
return nil, fmt.Errorf("cannot read part metadata: %w", err)
|
||||
}
|
||||
|
||||
metaindexPath := path + "/metaindex.bin"
|
||||
metaindexPath := filepath.Join(path, metaindexFilename)
|
||||
metaindexFile, err := filestream.Open(metaindexPath, true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open %q: %w", metaindexPath, err)
|
||||
}
|
||||
metaindexSize := fs.MustFileSize(metaindexPath)
|
||||
|
||||
indexPath := path + "/index.bin"
|
||||
indexPath := filepath.Join(path, indexFilename)
|
||||
indexFile := fs.MustOpenReaderAt(indexPath)
|
||||
indexSize := fs.MustFileSize(indexPath)
|
||||
|
||||
itemsPath := path + "/items.bin"
|
||||
itemsPath := filepath.Join(path, itemsFilename)
|
||||
itemsFile := fs.MustOpenReaderAt(itemsPath)
|
||||
itemsSize := fs.MustFileSize(itemsPath)
|
||||
|
||||
lensPath := path + "/lens.bin"
|
||||
lensPath := filepath.Join(path, lensFilename)
|
||||
lensFile := fs.MustOpenReaderAt(lensPath)
|
||||
lensSize := fs.MustFileSize(lensPath)
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
|
@ -81,7 +82,7 @@ func (ph *partHeader) ReadMetadata(partPath string) error {
|
|||
ph.Reset()
|
||||
|
||||
// Read ph fields from metadata.
|
||||
metadataPath := partPath + "/metadata.json"
|
||||
metadataPath := filepath.Join(partPath, metadataFilename)
|
||||
metadata, err := os.ReadFile(metadataPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read %q: %w", metadataPath, err)
|
||||
|
@ -123,7 +124,7 @@ func (ph *partHeader) WriteMetadata(partPath string) error {
|
|||
if err != nil {
|
||||
logger.Panicf("BUG: cannot marshal partHeader metadata: %s", err)
|
||||
}
|
||||
metadataPath := partPath + "/metadata.json"
|
||||
metadataPath := filepath.Join(partPath, metadataFilename)
|
||||
if err := fs.WriteFileAtomically(metadataPath, metadata, false); err != nil {
|
||||
return fmt.Errorf("cannot create %q: %w", metadataPath, err)
|
||||
}
|
||||
|
|
|
@ -1085,7 +1085,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal
|
|||
mergeIdx := tb.nextMergeIdx()
|
||||
dstPartPath := ""
|
||||
if dstPartType == partFile {
|
||||
dstPartPath = fmt.Sprintf("%s/%016X", tb.path, mergeIdx)
|
||||
dstPartPath = filepath.Join(tb.path, fmt.Sprintf("%016X", mergeIdx))
|
||||
}
|
||||
|
||||
if isFinal && len(pws) == 1 && pws[0].mp != nil {
|
||||
|
@ -1379,8 +1379,8 @@ func openParts(path string) ([]*partWrapper, error) {
|
|||
|
||||
// Remove txn and tmp directories, which may be left after the upgrade
|
||||
// to v1.90.0 and newer versions.
|
||||
fs.MustRemoveAll(path + "/txn")
|
||||
fs.MustRemoveAll(path + "/tmp")
|
||||
fs.MustRemoveAll(filepath.Join(path, "txn"))
|
||||
fs.MustRemoveAll(filepath.Join(path, "tmp"))
|
||||
|
||||
partNames := mustReadPartNames(path)
|
||||
|
||||
|
@ -1401,7 +1401,7 @@ func openParts(path string) ([]*partWrapper, error) {
|
|||
}
|
||||
fn := de.Name()
|
||||
if _, ok := m[fn]; !ok {
|
||||
deletePath := path + "/" + fn
|
||||
deletePath := filepath.Join(path, fn)
|
||||
fs.MustRemoveAll(deletePath)
|
||||
}
|
||||
}
|
||||
|
@ -1410,7 +1410,7 @@ func openParts(path string) ([]*partWrapper, error) {
|
|||
// Open parts
|
||||
var pws []*partWrapper
|
||||
for _, partName := range partNames {
|
||||
partPath := path + "/" + partName
|
||||
partPath := filepath.Join(path, partName)
|
||||
p, err := openFilePart(partPath)
|
||||
if err != nil {
|
||||
mustCloseParts(pws)
|
||||
|
@ -1456,7 +1456,7 @@ func (tb *Table) CreateSnapshotAt(dstDir string, deadline uint64) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("cannot obtain absolute dir for %q: %w", dstDir, err)
|
||||
}
|
||||
if strings.HasPrefix(dstDir, srcDir+"/") {
|
||||
if strings.HasPrefix(dstDir, srcDir+string(filepath.Separator)) {
|
||||
return fmt.Errorf("cannot create snapshot %q inside the data dir %q", dstDir, srcDir)
|
||||
}
|
||||
|
||||
|
@ -1483,7 +1483,7 @@ func (tb *Table) CreateSnapshotAt(dstDir string, deadline uint64) error {
|
|||
return fmt.Errorf("cannot create snapshot for %q: timeout exceeded", tb.path)
|
||||
}
|
||||
srcPartPath := pw.p.path
|
||||
dstPartPath := dstDir + "/" + filepath.Base(srcPartPath)
|
||||
dstPartPath := filepath.Join(dstDir, filepath.Base(srcPartPath))
|
||||
if err := fs.HardLinkFiles(srcPartPath, dstPartPath); err != nil {
|
||||
return fmt.Errorf("cannot create hard links from %q to %q: %w", srcPartPath, dstPartPath, err)
|
||||
}
|
||||
|
@ -1512,14 +1512,14 @@ func mustWritePartNames(pws []*partWrapper, dstDir string) {
|
|||
if err != nil {
|
||||
logger.Panicf("BUG: cannot marshal partNames to JSON: %s", err)
|
||||
}
|
||||
partNamesPath := dstDir + "/parts.json"
|
||||
partNamesPath := filepath.Join(dstDir, partsFilename)
|
||||
if err := fs.WriteFileAtomically(partNamesPath, data, true); err != nil {
|
||||
logger.Panicf("FATAL: cannot update %s: %s", partNamesPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
func mustReadPartNames(srcDir string) []string {
|
||||
partNamesPath := srcDir + "/parts.json"
|
||||
partNamesPath := filepath.Join(srcDir, partsFilename)
|
||||
data, err := os.ReadFile(partNamesPath)
|
||||
if err == nil {
|
||||
var partNames []string
|
||||
|
@ -1529,9 +1529,9 @@ func mustReadPartNames(srcDir string) []string {
|
|||
return partNames
|
||||
}
|
||||
if !os.IsNotExist(err) {
|
||||
logger.Panicf("FATAL: cannot read parts.json file: %s", err)
|
||||
logger.Panicf("FATAL: cannot read %s file: %s", partsFilename, err)
|
||||
}
|
||||
// The parts.json is missing. This is the upgrade from versions previous to v1.90.0.
|
||||
// The partsFilename is missing. This is the upgrade from versions previous to v1.90.0.
|
||||
// Read part names from directories under srcDir
|
||||
des, err := os.ReadDir(srcDir)
|
||||
if err != nil {
|
||||
|
|
|
@ -145,20 +145,20 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error {
|
|||
return fmt.Errorf("cannot parse path to part: %w", err)
|
||||
}
|
||||
|
||||
timestampsPath := path + "/timestamps.bin"
|
||||
timestampsPath := filepath.Join(path, timestampsFilename)
|
||||
timestampsFile, err := filestream.Open(timestampsPath, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open timestamps file in stream mode: %w", err)
|
||||
}
|
||||
|
||||
valuesPath := path + "/values.bin"
|
||||
valuesPath := filepath.Join(path, valuesFilename)
|
||||
valuesFile, err := filestream.Open(valuesPath, true)
|
||||
if err != nil {
|
||||
timestampsFile.MustClose()
|
||||
return fmt.Errorf("cannot open values file in stream mode: %w", err)
|
||||
}
|
||||
|
||||
indexPath := path + "/index.bin"
|
||||
indexPath := filepath.Join(path, indexFilename)
|
||||
indexFile, err := filestream.Open(indexPath, true)
|
||||
if err != nil {
|
||||
timestampsFile.MustClose()
|
||||
|
@ -166,7 +166,7 @@ func (bsr *blockStreamReader) InitFromFilePart(path string) error {
|
|||
return fmt.Errorf("cannot open index file in stream mode: %w", err)
|
||||
}
|
||||
|
||||
metaindexPath := path + "/metaindex.bin"
|
||||
metaindexPath := filepath.Join(path, metaindexFilename)
|
||||
metaindexFile, err := filestream.Open(metaindexPath, true)
|
||||
if err != nil {
|
||||
timestampsFile.MustClose()
|
||||
|
|
|
@ -104,14 +104,14 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
|||
}
|
||||
|
||||
// Create part files in the directory.
|
||||
timestampsPath := path + "/timestamps.bin"
|
||||
timestampsPath := filepath.Join(path, timestampsFilename)
|
||||
timestampsFile, err := filestream.Create(timestampsPath, nocache)
|
||||
if err != nil {
|
||||
fs.MustRemoveDirAtomic(path)
|
||||
return fmt.Errorf("cannot create timestamps file: %w", err)
|
||||
}
|
||||
|
||||
valuesPath := path + "/values.bin"
|
||||
valuesPath := filepath.Join(path, valuesFilename)
|
||||
valuesFile, err := filestream.Create(valuesPath, nocache)
|
||||
if err != nil {
|
||||
timestampsFile.MustClose()
|
||||
|
@ -119,7 +119,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
|||
return fmt.Errorf("cannot create values file: %w", err)
|
||||
}
|
||||
|
||||
indexPath := path + "/index.bin"
|
||||
indexPath := filepath.Join(path, indexFilename)
|
||||
indexFile, err := filestream.Create(indexPath, nocache)
|
||||
if err != nil {
|
||||
timestampsFile.MustClose()
|
||||
|
@ -130,7 +130,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
|
|||
|
||||
// Always cache metaindex file in OS page cache, since it is immediately
|
||||
// read after the merge.
|
||||
metaindexPath := path + "/metaindex.bin"
|
||||
metaindexPath := filepath.Join(path, metaindexFilename)
|
||||
metaindexFile, err := filestream.Create(metaindexPath, false)
|
||||
if err != nil {
|
||||
timestampsFile.MustClose()
|
||||
|
|
24
lib/storage/filenames.go
Normal file
24
lib/storage/filenames.go
Normal file
|
@ -0,0 +1,24 @@
|
|||
package storage
|
||||
|
||||
const (
|
||||
metaindexFilename = "metaindex.bin"
|
||||
indexFilename = "index.bin"
|
||||
valuesFilename = "values.bin"
|
||||
timestampsFilename = "timestamps.bin"
|
||||
partsFilename = "parts.json"
|
||||
metadataFilename = "metadata.json"
|
||||
|
||||
appliedRetentionFilename = "appliedRetention.txt"
|
||||
resetCacheOnStartupFilename = "reset_cache_on_startup"
|
||||
)
|
||||
|
||||
const (
|
||||
smallDirname = "small"
|
||||
bigDirname = "big"
|
||||
|
||||
indexdbDirname = "indexdb"
|
||||
dataDirname = "data"
|
||||
metadataDirname = "metadata"
|
||||
snapshotsDirname = "snapshots"
|
||||
cacheDirname = "cache"
|
||||
)
|
|
@ -40,19 +40,19 @@ func (mp *inmemoryPart) StoreToDisk(path string) error {
|
|||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||
return fmt.Errorf("cannot create directory %q: %w", path, err)
|
||||
}
|
||||
timestampsPath := path + "/timestamps.bin"
|
||||
timestampsPath := filepath.Join(path, timestampsFilename)
|
||||
if err := fs.WriteFileAndSync(timestampsPath, mp.timestampsData.B); err != nil {
|
||||
return fmt.Errorf("cannot store timestamps: %w", err)
|
||||
}
|
||||
valuesPath := path + "/values.bin"
|
||||
valuesPath := filepath.Join(path, valuesFilename)
|
||||
if err := fs.WriteFileAndSync(valuesPath, mp.valuesData.B); err != nil {
|
||||
return fmt.Errorf("cannot store values: %w", err)
|
||||
}
|
||||
indexPath := path + "/index.bin"
|
||||
indexPath := filepath.Join(path, indexFilename)
|
||||
if err := fs.WriteFileAndSync(indexPath, mp.indexData.B); err != nil {
|
||||
return fmt.Errorf("cannot store index: %w", err)
|
||||
}
|
||||
metaindexPath := path + "/metaindex.bin"
|
||||
metaindexPath := filepath.Join(path, metaindexFilename)
|
||||
if err := fs.WriteFileAndSync(metaindexPath, mp.metaindexData.B); err != nil {
|
||||
return fmt.Errorf("cannot store metaindex: %w", err)
|
||||
}
|
||||
|
|
|
@ -54,19 +54,19 @@ func openFilePart(path string) (*part, error) {
|
|||
return nil, fmt.Errorf("cannot parse path to part: %w", err)
|
||||
}
|
||||
|
||||
timestampsPath := path + "/timestamps.bin"
|
||||
timestampsPath := filepath.Join(path, timestampsFilename)
|
||||
timestampsFile := fs.MustOpenReaderAt(timestampsPath)
|
||||
timestampsSize := fs.MustFileSize(timestampsPath)
|
||||
|
||||
valuesPath := path + "/values.bin"
|
||||
valuesPath := filepath.Join(path, valuesFilename)
|
||||
valuesFile := fs.MustOpenReaderAt(valuesPath)
|
||||
valuesSize := fs.MustFileSize(valuesPath)
|
||||
|
||||
indexPath := path + "/index.bin"
|
||||
indexPath := filepath.Join(path, indexFilename)
|
||||
indexFile := fs.MustOpenReaderAt(indexPath)
|
||||
indexSize := fs.MustFileSize(indexPath)
|
||||
|
||||
metaindexPath := path + "/metaindex.bin"
|
||||
metaindexPath := filepath.Join(path, metaindexFilename)
|
||||
metaindexFile, err := filestream.Open(metaindexPath, true)
|
||||
if err != nil {
|
||||
timestampsFile.MustClose()
|
||||
|
|
|
@ -48,7 +48,7 @@ func (ph *partHeader) Reset() {
|
|||
}
|
||||
|
||||
func (ph *partHeader) readMinDedupInterval(partPath string) error {
|
||||
filePath := partPath + "/min_dedup_interval"
|
||||
filePath := filepath.Join(partPath, "min_dedup_interval")
|
||||
data, err := os.ReadFile(filePath)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
|
@ -83,11 +83,7 @@ func (ph *partHeader) ParseFromPath(path string) error {
|
|||
path = filepath.Clean(path)
|
||||
|
||||
// Extract encoded part name.
|
||||
n := strings.LastIndexByte(path, '/')
|
||||
if n < 0 {
|
||||
return fmt.Errorf("cannot find encoded part name in the path %q", path)
|
||||
}
|
||||
partName := path[n+1:]
|
||||
partName := filepath.Base(path)
|
||||
|
||||
// PartName must have the following form:
|
||||
// RowsCount_BlocksCount_MinTimestamp_MaxTimestamp_Garbage
|
||||
|
@ -138,7 +134,7 @@ func (ph *partHeader) ParseFromPath(path string) error {
|
|||
func (ph *partHeader) ReadMetadata(partPath string) error {
|
||||
ph.Reset()
|
||||
|
||||
metadataPath := partPath + "/metadata.json"
|
||||
metadataPath := filepath.Join(partPath, metadataFilename)
|
||||
metadata, err := os.ReadFile(metadataPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
|
@ -174,7 +170,7 @@ func (ph *partHeader) WriteMetadata(partPath string) error {
|
|||
if err != nil {
|
||||
logger.Panicf("BUG: cannot marshal partHeader metadata: %s", err)
|
||||
}
|
||||
metadataPath := partPath + "/metadata.json"
|
||||
metadataPath := filepath.Join(partPath, metadataFilename)
|
||||
if err := fs.WriteFileAtomically(metadataPath, metadata, false); err != nil {
|
||||
return fmt.Errorf("cannot create %q: %w", metadataPath, err)
|
||||
}
|
||||
|
|
|
@ -216,8 +216,8 @@ func (pw *partWrapper) decRef() {
|
|||
// to small and big partitions.
|
||||
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, s *Storage) (*partition, error) {
|
||||
name := timestampToPartitionName(timestamp)
|
||||
smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name
|
||||
bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name
|
||||
smallPartsPath := filepath.Join(filepath.Clean(smallPartitionsPath), name)
|
||||
bigPartsPath := filepath.Join(filepath.Clean(bigPartitionsPath), name)
|
||||
logger.Infof("creating a partition %q with smallPartsPath=%q, bigPartsPath=%q", name, smallPartsPath, bigPartsPath)
|
||||
|
||||
if err := fs.MkdirAllFailIfExist(smallPartsPath); err != nil {
|
||||
|
@ -259,13 +259,8 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition,
|
|||
smallPartsPath = filepath.Clean(smallPartsPath)
|
||||
bigPartsPath = filepath.Clean(bigPartsPath)
|
||||
|
||||
n := strings.LastIndexByte(smallPartsPath, '/')
|
||||
if n < 0 {
|
||||
return nil, fmt.Errorf("cannot find partition name from smallPartsPath %q; must be in the form /path/to/smallparts/YYYY_MM", smallPartsPath)
|
||||
}
|
||||
name := smallPartsPath[n+1:]
|
||||
|
||||
if !strings.HasSuffix(bigPartsPath, "/"+name) {
|
||||
name := filepath.Base(smallPartsPath)
|
||||
if !strings.HasSuffix(bigPartsPath, name) {
|
||||
return nil, fmt.Errorf("patititon name in bigPartsPath %q doesn't match smallPartsPath %q; want %q", bigPartsPath, smallPartsPath, name)
|
||||
}
|
||||
|
||||
|
@ -1427,7 +1422,7 @@ func (pt *partition) getDstPartPath(dstPartType partType, mergeIdx uint64) strin
|
|||
}
|
||||
dstPartPath := ""
|
||||
if dstPartType != partInmemory {
|
||||
dstPartPath = fmt.Sprintf("%s/%016X", ptPath, mergeIdx)
|
||||
dstPartPath = filepath.Join(ptPath, fmt.Sprintf("%016X", mergeIdx))
|
||||
}
|
||||
return dstPartPath
|
||||
}
|
||||
|
@ -1826,8 +1821,8 @@ func openParts(path string, partNames []string) ([]*partWrapper, error) {
|
|||
|
||||
// Remove txn and tmp directories, which may be left after the upgrade
|
||||
// to v1.90.0 and newer versions.
|
||||
fs.MustRemoveAll(path + "/txn")
|
||||
fs.MustRemoveAll(path + "/tmp")
|
||||
fs.MustRemoveAll(filepath.Join(path, "txn"))
|
||||
fs.MustRemoveAll(filepath.Join(path, "tmp"))
|
||||
|
||||
// Remove dirs missing in partNames. These dirs may be left after unclean shutdown
|
||||
// or after the update from versions prior to v1.90.0.
|
||||
|
@ -1846,7 +1841,7 @@ func openParts(path string, partNames []string) ([]*partWrapper, error) {
|
|||
}
|
||||
fn := de.Name()
|
||||
if _, ok := m[fn]; !ok {
|
||||
deletePath := path + "/" + fn
|
||||
deletePath := filepath.Join(path, fn)
|
||||
fs.MustRemoveAll(deletePath)
|
||||
}
|
||||
}
|
||||
|
@ -1855,7 +1850,7 @@ func openParts(path string, partNames []string) ([]*partWrapper, error) {
|
|||
// Open parts
|
||||
var pws []*partWrapper
|
||||
for _, partName := range partNames {
|
||||
partPath := path + "/" + partName
|
||||
partPath := filepath.Join(path, partName)
|
||||
p, err := openFilePart(partPath)
|
||||
if err != nil {
|
||||
mustCloseParts(pws)
|
||||
|
@ -1931,19 +1926,19 @@ func (pt *partition) createSnapshot(srcDir, dstDir string, pws []*partWrapper) e
|
|||
// Make hardlinks for pws at dstDir
|
||||
for _, pw := range pws {
|
||||
srcPartPath := pw.p.path
|
||||
dstPartPath := dstDir + "/" + filepath.Base(srcPartPath)
|
||||
dstPartPath := filepath.Join(dstDir, filepath.Base(srcPartPath))
|
||||
if err := fs.HardLinkFiles(srcPartPath, dstPartPath); err != nil {
|
||||
return fmt.Errorf("cannot create hard links from %q to %q: %w", srcPartPath, dstPartPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Copy the appliedRetention.txt file to dstDir.
|
||||
// Copy the appliedRetentionFilename to dstDir.
|
||||
// This file can be created by VictoriaMetrics enterprise.
|
||||
// See https://docs.victoriametrics.com/#retention-filters .
|
||||
// Do not make hard link to this file, since it can be modified over time.
|
||||
srcPath := srcDir + "/appliedRetention.txt"
|
||||
srcPath := filepath.Join(srcDir, appliedRetentionFilename)
|
||||
if fs.IsPathExist(srcPath) {
|
||||
dstPath := dstDir + "/" + filepath.Base(srcPath)
|
||||
dstPath := filepath.Join(dstDir, filepath.Base(srcPath))
|
||||
if err := fs.CopyFile(srcPath, dstPath); err != nil {
|
||||
return fmt.Errorf("cannot copy %q to %q: %w", srcPath, dstPath, err)
|
||||
}
|
||||
|
@ -1972,7 +1967,7 @@ func mustWritePartNames(pwsSmall, pwsBig []*partWrapper, dstDir string) {
|
|||
if err != nil {
|
||||
logger.Panicf("BUG: cannot marshal partNames to JSON: %s", err)
|
||||
}
|
||||
partNamesPath := dstDir + "/parts.json"
|
||||
partNamesPath := filepath.Join(dstDir, partsFilename)
|
||||
if err := fs.WriteFileAtomically(partNamesPath, data, true); err != nil {
|
||||
logger.Panicf("FATAL: cannot update %s: %s", partNamesPath, err)
|
||||
}
|
||||
|
@ -1993,7 +1988,7 @@ func getPartNames(pws []*partWrapper) []string {
|
|||
}
|
||||
|
||||
func mustReadPartNames(smallPartsPath, bigPartsPath string) ([]string, []string) {
|
||||
partNamesPath := smallPartsPath + "/parts.json"
|
||||
partNamesPath := filepath.Join(smallPartsPath, partsFilename)
|
||||
data, err := os.ReadFile(partNamesPath)
|
||||
if err == nil {
|
||||
var partNames partNamesJSON
|
||||
|
@ -2003,9 +1998,9 @@ func mustReadPartNames(smallPartsPath, bigPartsPath string) ([]string, []string)
|
|||
return partNames.Small, partNames.Big
|
||||
}
|
||||
if !os.IsNotExist(err) {
|
||||
logger.Panicf("FATAL: cannot read parts.json file: %s", err)
|
||||
logger.Panicf("FATAL: cannot read %s file: %s", partsFilename, err)
|
||||
}
|
||||
// The parts.json is missing. This is the upgrade from versions previous to v1.90.0.
|
||||
// The partsFilename is missing. This is the upgrade from versions previous to v1.90.0.
|
||||
// Read part names from smallPartsPath and bigPartsPath directories
|
||||
partNamesSmall := mustReadPartNamesFromDir(smallPartsPath)
|
||||
partNamesBig := mustReadPartNamesFromDir(bigPartsPath)
|
||||
|
@ -2037,5 +2032,5 @@ func mustReadPartNamesFromDir(srcDir string) []string {
|
|||
}
|
||||
|
||||
func isSpecialDir(name string) bool {
|
||||
return name == "tmp" || name == "txn" || name == "snapshots" || fs.IsScheduledForRemoval(name)
|
||||
return name == "tmp" || name == "txn" || name == snapshotsDirname || fs.IsScheduledForRemoval(name)
|
||||
}
|
||||
|
|
|
@ -169,17 +169,17 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
|
|||
// Create partition from rowss and test search on it.
|
||||
strg := newTestStorage()
|
||||
strg.retentionMsecs = timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000
|
||||
pt, err := createPartition(ptt, "./small-table", "./big-table", strg)
|
||||
pt, err := createPartition(ptt, "small-table", "big-table", strg)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create partition: %s", err)
|
||||
}
|
||||
smallPartsPath := pt.smallPartsPath
|
||||
bigPartsPath := pt.bigPartsPath
|
||||
defer func() {
|
||||
if err := os.RemoveAll("./small-table"); err != nil {
|
||||
if err := os.RemoveAll("small-table"); err != nil {
|
||||
t.Fatalf("cannot remove small parts directory: %s", err)
|
||||
}
|
||||
if err := os.RemoveAll("./big-table"); err != nil {
|
||||
if err := os.RemoveAll("big-table"); err != nil {
|
||||
t.Fatalf("cannot remove big parts directory: %s", err)
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/backupnames"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||
|
@ -147,7 +148,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
|||
}
|
||||
s := &Storage{
|
||||
path: path,
|
||||
cachePath: path + "/cache",
|
||||
cachePath: filepath.Join(path, cacheDirname),
|
||||
retentionMsecs: retentionMsecs,
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
|
@ -156,10 +157,10 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
|||
}
|
||||
|
||||
// Check whether the cache directory must be removed
|
||||
// It is removed if it contains reset_cache_on_startup file.
|
||||
// It is removed if it contains resetCacheOnStartupFilename.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1447 for details.
|
||||
if fs.IsPathExist(s.cachePath + "/reset_cache_on_startup") {
|
||||
logger.Infof("removing cache directory at %q, since it contains `reset_cache_on_startup` file...", s.cachePath)
|
||||
if fs.IsPathExist(filepath.Join(s.cachePath, resetCacheOnStartupFilename)) {
|
||||
logger.Infof("removing cache directory at %q, since it contains `%s` file...", s.cachePath, resetCacheOnStartupFilename)
|
||||
// Do not use fs.MustRemoveAll() here, since the cache directory may be mounted
|
||||
// to a separate filesystem. In this case the fs.MustRemoveAll() will fail while
|
||||
// trying to remove the mount root.
|
||||
|
@ -176,13 +177,13 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
|||
s.flockF = flockF
|
||||
|
||||
// Check whether restore process finished successfully
|
||||
restoreLockF := path + "/restore-in-progress"
|
||||
restoreLockF := filepath.Join(path, backupnames.RestoreInProgressFilename)
|
||||
if fs.IsPathExist(restoreLockF) {
|
||||
return nil, fmt.Errorf("restore lock file exists, incomplete vmrestore run. Run vmrestore again or remove lock file %q", restoreLockF)
|
||||
}
|
||||
|
||||
// Pre-create snapshots directory if it is missing.
|
||||
snapshotsPath := path + "/snapshots"
|
||||
snapshotsPath := filepath.Join(path, snapshotsDirname)
|
||||
if err := fs.MkdirAllIfNotExist(snapshotsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", snapshotsPath, err)
|
||||
}
|
||||
|
@ -218,16 +219,16 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
|||
s.prefetchedMetricIDs.Store(&uint64set.Set{})
|
||||
|
||||
// Load metadata
|
||||
metadataDir := path + "/metadata"
|
||||
isEmptyDB := !fs.IsPathExist(path + "/indexdb")
|
||||
metadataDir := filepath.Join(path, metadataDirname)
|
||||
isEmptyDB := !fs.IsPathExist(filepath.Join(path, indexdbDirname))
|
||||
if err := fs.MkdirAllIfNotExist(metadataDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", metadataDir, err)
|
||||
}
|
||||
s.minTimestampForCompositeIndex = mustGetMinTimestampForCompositeIndex(metadataDir, isEmptyDB)
|
||||
|
||||
// Load indexdb
|
||||
idbPath := path + "/indexdb"
|
||||
idbSnapshotsPath := idbPath + "/snapshots"
|
||||
idbPath := filepath.Join(path, indexdbDirname)
|
||||
idbSnapshotsPath := filepath.Join(idbPath, snapshotsDirname)
|
||||
if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", idbSnapshotsPath, err)
|
||||
}
|
||||
|
@ -252,7 +253,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
|||
s.updateDeletedMetricIDs(dmisPrev)
|
||||
|
||||
// Load data
|
||||
tablePath := path + "/data"
|
||||
tablePath := filepath.Join(path, dataDirname)
|
||||
tb, err := openTable(tablePath, s)
|
||||
if err != nil {
|
||||
s.idb().MustClose()
|
||||
|
@ -322,7 +323,7 @@ func (s *Storage) CreateSnapshot(deadline uint64) (string, error) {
|
|||
|
||||
snapshotName := snapshot.NewName()
|
||||
srcDir := s.path
|
||||
dstDir := fmt.Sprintf("%s/snapshots/%s", srcDir, snapshotName)
|
||||
dstDir := filepath.Join(srcDir, snapshotsDirname, snapshotName)
|
||||
if err := fs.MkdirAllFailIfExist(dstDir); err != nil {
|
||||
return "", fmt.Errorf("cannot create dir %q: %w", dstDir, err)
|
||||
}
|
||||
|
@ -334,42 +335,42 @@ func (s *Storage) CreateSnapshot(deadline uint64) (string, error) {
|
|||
}
|
||||
dirsToRemoveOnError = append(dirsToRemoveOnError, smallDir, bigDir)
|
||||
|
||||
dstDataDir := dstDir + "/data"
|
||||
dstDataDir := filepath.Join(dstDir, dataDirname)
|
||||
if err := fs.MkdirAllFailIfExist(dstDataDir); err != nil {
|
||||
return "", fmt.Errorf("cannot create dir %q: %w", dstDataDir, err)
|
||||
}
|
||||
dstSmallDir := dstDataDir + "/small"
|
||||
dstSmallDir := filepath.Join(dstDataDir, smallDirname)
|
||||
if err := fs.SymlinkRelative(smallDir, dstSmallDir); err != nil {
|
||||
return "", fmt.Errorf("cannot create symlink from %q to %q: %w", smallDir, dstSmallDir, err)
|
||||
}
|
||||
dstBigDir := dstDataDir + "/big"
|
||||
dstBigDir := filepath.Join(dstDataDir, bigDirname)
|
||||
if err := fs.SymlinkRelative(bigDir, dstBigDir); err != nil {
|
||||
return "", fmt.Errorf("cannot create symlink from %q to %q: %w", bigDir, dstBigDir, err)
|
||||
}
|
||||
fs.MustSyncPath(dstDataDir)
|
||||
|
||||
srcMetadataDir := srcDir + "/metadata"
|
||||
dstMetadataDir := dstDir + "/metadata"
|
||||
srcMetadataDir := filepath.Join(srcDir, metadataDirname)
|
||||
dstMetadataDir := filepath.Join(dstDir, metadataDirname)
|
||||
if err := fs.CopyDirectory(srcMetadataDir, dstMetadataDir); err != nil {
|
||||
return "", fmt.Errorf("cannot copy metadata: %w", err)
|
||||
}
|
||||
|
||||
idbSnapshot := fmt.Sprintf("%s/indexdb/snapshots/%s", srcDir, snapshotName)
|
||||
idbSnapshot := filepath.Join(srcDir, indexdbDirname, snapshotsDirname, snapshotName)
|
||||
idb := s.idb()
|
||||
currSnapshot := idbSnapshot + "/" + idb.name
|
||||
currSnapshot := filepath.Join(idbSnapshot, idb.name)
|
||||
if err := idb.tb.CreateSnapshotAt(currSnapshot, deadline); err != nil {
|
||||
return "", fmt.Errorf("cannot create curr indexDB snapshot: %w", err)
|
||||
}
|
||||
dirsToRemoveOnError = append(dirsToRemoveOnError, idbSnapshot)
|
||||
|
||||
ok := idb.doExtDB(func(extDB *indexDB) {
|
||||
prevSnapshot := idbSnapshot + "/" + extDB.name
|
||||
prevSnapshot := filepath.Join(idbSnapshot, extDB.name)
|
||||
err = extDB.tb.CreateSnapshotAt(prevSnapshot, deadline)
|
||||
})
|
||||
if ok && err != nil {
|
||||
return "", fmt.Errorf("cannot create prev indexDB snapshot: %w", err)
|
||||
}
|
||||
dstIdbDir := dstDir + "/indexdb"
|
||||
dstIdbDir := filepath.Join(dstDir, indexdbDirname)
|
||||
if err := fs.SymlinkRelative(idbSnapshot, dstIdbDir); err != nil {
|
||||
return "", fmt.Errorf("cannot create symlink from %q to %q: %w", idbSnapshot, dstIdbDir, err)
|
||||
}
|
||||
|
@ -383,7 +384,7 @@ func (s *Storage) CreateSnapshot(deadline uint64) (string, error) {
|
|||
|
||||
// ListSnapshots returns sorted list of existing snapshots for s.
|
||||
func (s *Storage) ListSnapshots() ([]string, error) {
|
||||
snapshotsPath := s.path + "/snapshots"
|
||||
snapshotsPath := filepath.Join(s.path, snapshotsDirname)
|
||||
d, err := os.Open(snapshotsPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open snapshots directory: %w", err)
|
||||
|
@ -410,13 +411,13 @@ func (s *Storage) DeleteSnapshot(snapshotName string) error {
|
|||
if err := snapshot.Validate(snapshotName); err != nil {
|
||||
return fmt.Errorf("invalid snapshotName %q: %w", snapshotName, err)
|
||||
}
|
||||
snapshotPath := s.path + "/snapshots/" + snapshotName
|
||||
snapshotPath := filepath.Join(s.path, snapshotsDirname, snapshotName)
|
||||
|
||||
logger.Infof("deleting snapshot %q...", snapshotPath)
|
||||
startTime := time.Now()
|
||||
|
||||
s.tb.MustDeleteSnapshot(snapshotName)
|
||||
idbPath := fmt.Sprintf("%s/indexdb/snapshots/%s", s.path, snapshotName)
|
||||
idbPath := filepath.Join(s.path, indexdbDirname, snapshotsDirname, snapshotName)
|
||||
fs.MustRemoveDirAtomic(idbPath)
|
||||
fs.MustRemoveDirAtomic(snapshotPath)
|
||||
|
||||
|
@ -723,7 +724,7 @@ func (s *Storage) nextDayMetricIDsUpdater() {
|
|||
func (s *Storage) mustRotateIndexDB() {
|
||||
// Create new indexdb table.
|
||||
newTableName := nextIndexDBTableName()
|
||||
idbNewPath := s.path + "/indexdb/" + newTableName
|
||||
idbNewPath := filepath.Join(s.path, indexdbDirname, newTableName)
|
||||
rotationTimestamp := fasttime.UnixTimestamp()
|
||||
idbNew, err := openIndexDB(idbNewPath, s, rotationTimestamp, &s.isReadOnly)
|
||||
if err != nil {
|
||||
|
@ -835,7 +836,7 @@ func (s *Storage) mustLoadNextDayMetricIDs(date uint64) *byDateMetricIDEntry {
|
|||
date: date,
|
||||
}
|
||||
name := "next_day_metric_ids"
|
||||
path := s.cachePath + "/" + name
|
||||
path := filepath.Join(s.cachePath, name)
|
||||
logger.Infof("loading %s from %q...", name, path)
|
||||
startTime := time.Now()
|
||||
if !fs.IsPathExist(path) {
|
||||
|
@ -879,7 +880,7 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
|
|||
hm := &hourMetricIDs{
|
||||
hour: hour,
|
||||
}
|
||||
path := s.cachePath + "/" + name
|
||||
path := filepath.Join(s.cachePath, name)
|
||||
logger.Infof("loading %s from %q...", name, path)
|
||||
startTime := time.Now()
|
||||
if !fs.IsPathExist(path) {
|
||||
|
@ -921,7 +922,7 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
|
|||
|
||||
func (s *Storage) mustSaveNextDayMetricIDs(e *byDateMetricIDEntry) {
|
||||
name := "next_day_metric_ids"
|
||||
path := s.cachePath + "/" + name
|
||||
path := filepath.Join(s.cachePath, name)
|
||||
logger.Infof("saving %s to %q...", name, path)
|
||||
startTime := time.Now()
|
||||
dst := make([]byte, 0, e.v.Len()*8+16)
|
||||
|
@ -939,7 +940,7 @@ func (s *Storage) mustSaveNextDayMetricIDs(e *byDateMetricIDEntry) {
|
|||
}
|
||||
|
||||
func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
|
||||
path := s.cachePath + "/" + name
|
||||
path := filepath.Join(s.cachePath, name)
|
||||
logger.Infof("saving %s to %q...", name, path)
|
||||
startTime := time.Now()
|
||||
dst := make([]byte, 0, hm.m.Len()*8+24)
|
||||
|
@ -983,7 +984,7 @@ func marshalUint64Set(dst []byte, m *uint64set.Set) []byte {
|
|||
}
|
||||
|
||||
func mustGetMinTimestampForCompositeIndex(metadataDir string, isEmptyDB bool) int64 {
|
||||
path := metadataDir + "/minTimestampForCompositeIndex"
|
||||
path := filepath.Join(metadataDir, "minTimestampForCompositeIndex")
|
||||
minTimestamp, err := loadMinTimestampForCompositeIndex(path)
|
||||
if err == nil {
|
||||
return minTimestamp
|
||||
|
@ -1019,7 +1020,7 @@ func loadMinTimestampForCompositeIndex(path string) (int64, error) {
|
|||
}
|
||||
|
||||
func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *workingsetcache.Cache {
|
||||
path := s.cachePath + "/" + name
|
||||
path := filepath.Join(s.cachePath, name)
|
||||
logger.Infof("loading %s cache from %q...", info, path)
|
||||
startTime := time.Now()
|
||||
c := workingsetcache.Load(path, sizeBytes)
|
||||
|
@ -1034,7 +1035,7 @@ func (s *Storage) mustSaveCache(c *workingsetcache.Cache, info, name string) {
|
|||
saveCacheLock.Lock()
|
||||
defer saveCacheLock.Unlock()
|
||||
|
||||
path := s.cachePath + "/" + name
|
||||
path := filepath.Join(s.cachePath, name)
|
||||
logger.Infof("saving %s cache to %q...", info, path)
|
||||
startTime := time.Now()
|
||||
if err := c.Save(path); err != nil {
|
||||
|
@ -2375,7 +2376,7 @@ func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error
|
|||
|
||||
// Remove all the tables except two last tables.
|
||||
for _, tn := range tableNames[:len(tableNames)-2] {
|
||||
pathToRemove := path + "/" + tn
|
||||
pathToRemove := filepath.Join(path, tn)
|
||||
logger.Infof("removing obsolete indexdb dir %q...", pathToRemove)
|
||||
fs.MustRemoveAll(pathToRemove)
|
||||
logger.Infof("removed obsolete indexdb dir %q", pathToRemove)
|
||||
|
@ -2385,13 +2386,13 @@ func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error
|
|||
fs.MustSyncPath(path)
|
||||
|
||||
// Open the last two tables.
|
||||
currPath := path + "/" + tableNames[len(tableNames)-1]
|
||||
currPath := filepath.Join(path, tableNames[len(tableNames)-1])
|
||||
|
||||
curr, err = openIndexDB(currPath, s, 0, &s.isReadOnly)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err)
|
||||
}
|
||||
prevPath := path + "/" + tableNames[len(tableNames)-2]
|
||||
prevPath := filepath.Join(path, tableNames[len(tableNames)-2])
|
||||
prev, err = openIndexDB(prevPath, s, 0, &s.isReadOnly)
|
||||
if err != nil {
|
||||
curr.MustClose()
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
|
@ -1036,7 +1037,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error {
|
|||
}
|
||||
|
||||
// Try opening the storage from snapshot.
|
||||
snapshotPath := s.path + "/snapshots/" + snapshotName
|
||||
snapshotPath := filepath.Join(s.path, snapshotsDirname, snapshotName)
|
||||
s1, err := OpenStorage(snapshotPath, 0, 0, 0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open storage from snapshot: %w", err)
|
||||
|
|
|
@ -95,23 +95,23 @@ func openTable(path string, s *Storage) (*table, error) {
|
|||
}
|
||||
|
||||
// Create directories for small and big partitions if they don't exist yet.
|
||||
smallPartitionsPath := path + "/small"
|
||||
smallPartitionsPath := filepath.Join(path, smallDirname)
|
||||
if err := fs.MkdirAllIfNotExist(smallPartitionsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create directory for small partitions %q: %w", smallPartitionsPath, err)
|
||||
}
|
||||
fs.MustRemoveTemporaryDirs(smallPartitionsPath)
|
||||
smallSnapshotsPath := smallPartitionsPath + "/snapshots"
|
||||
smallSnapshotsPath := filepath.Join(smallPartitionsPath, snapshotsDirname)
|
||||
if err := fs.MkdirAllIfNotExist(smallSnapshotsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", smallSnapshotsPath, err)
|
||||
}
|
||||
fs.MustRemoveTemporaryDirs(smallSnapshotsPath)
|
||||
|
||||
bigPartitionsPath := path + "/big"
|
||||
bigPartitionsPath := filepath.Join(path, bigDirname)
|
||||
if err := fs.MkdirAllIfNotExist(bigPartitionsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create directory for big partitions %q: %w", bigPartitionsPath, err)
|
||||
}
|
||||
fs.MustRemoveTemporaryDirs(bigPartitionsPath)
|
||||
bigSnapshotsPath := bigPartitionsPath + "/snapshots"
|
||||
bigSnapshotsPath := filepath.Join(bigPartitionsPath, snapshotsDirname)
|
||||
if err := fs.MkdirAllIfNotExist(bigSnapshotsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", bigSnapshotsPath, err)
|
||||
}
|
||||
|
@ -151,11 +151,11 @@ func (tb *table) CreateSnapshot(snapshotName string, deadline uint64) (string, s
|
|||
ptws := tb.GetPartitions(nil)
|
||||
defer tb.PutPartitions(ptws)
|
||||
|
||||
dstSmallDir := fmt.Sprintf("%s/small/snapshots/%s", tb.path, snapshotName)
|
||||
dstSmallDir := filepath.Join(tb.path, smallDirname, snapshotsDirname, snapshotName)
|
||||
if err := fs.MkdirAllFailIfExist(dstSmallDir); err != nil {
|
||||
return "", "", fmt.Errorf("cannot create dir %q: %w", dstSmallDir, err)
|
||||
}
|
||||
dstBigDir := fmt.Sprintf("%s/big/snapshots/%s", tb.path, snapshotName)
|
||||
dstBigDir := filepath.Join(tb.path, bigDirname, snapshotsDirname, snapshotName)
|
||||
if err := fs.MkdirAllFailIfExist(dstBigDir); err != nil {
|
||||
fs.MustRemoveAll(dstSmallDir)
|
||||
return "", "", fmt.Errorf("cannot create dir %q: %w", dstBigDir, err)
|
||||
|
@ -168,8 +168,8 @@ func (tb *table) CreateSnapshot(snapshotName string, deadline uint64) (string, s
|
|||
return "", "", fmt.Errorf("cannot create snapshot for %q: timeout exceeded", tb.path)
|
||||
}
|
||||
|
||||
smallPath := dstSmallDir + "/" + ptw.pt.name
|
||||
bigPath := dstBigDir + "/" + ptw.pt.name
|
||||
smallPath := filepath.Join(dstSmallDir, ptw.pt.name)
|
||||
bigPath := filepath.Join(dstBigDir, ptw.pt.name)
|
||||
if err := ptw.pt.CreateSnapshotAt(smallPath, bigPath); err != nil {
|
||||
fs.MustRemoveAll(dstSmallDir)
|
||||
fs.MustRemoveAll(dstBigDir)
|
||||
|
@ -188,9 +188,9 @@ func (tb *table) CreateSnapshot(snapshotName string, deadline uint64) (string, s
|
|||
|
||||
// MustDeleteSnapshot deletes snapshot with the given snapshotName.
|
||||
func (tb *table) MustDeleteSnapshot(snapshotName string) {
|
||||
smallDir := fmt.Sprintf("%s/small/snapshots/%s", tb.path, snapshotName)
|
||||
smallDir := filepath.Join(tb.path, smallDirname, snapshotsDirname, snapshotName)
|
||||
fs.MustRemoveDirAtomic(smallDir)
|
||||
bigDir := fmt.Sprintf("%s/big/snapshots/%s", tb.path, snapshotName)
|
||||
bigDir := filepath.Join(tb.path, bigDirname, snapshotsDirname, snapshotName)
|
||||
fs.MustRemoveDirAtomic(bigDir)
|
||||
}
|
||||
|
||||
|
@ -522,8 +522,8 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) (
|
|||
}
|
||||
var pts []*partition
|
||||
for ptName := range ptNames {
|
||||
smallPartsPath := smallPartitionsPath + "/" + ptName
|
||||
bigPartsPath := bigPartitionsPath + "/" + ptName
|
||||
smallPartsPath := filepath.Join(smallPartitionsPath, ptName)
|
||||
bigPartsPath := filepath.Join(bigPartitionsPath, ptName)
|
||||
pt, err := openPartition(smallPartsPath, bigPartsPath, s)
|
||||
if err != nil {
|
||||
mustClosePartitions(pts)
|
||||
|
@ -545,7 +545,7 @@ func populatePartitionNames(partitionsPath string, ptNames map[string]bool) erro
|
|||
continue
|
||||
}
|
||||
ptName := de.Name()
|
||||
if ptName == "snapshots" {
|
||||
if ptName == snapshotsDirname {
|
||||
// Skip directory with snapshots
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -184,12 +184,12 @@ func testTableSearchEx(t *testing.T, rng *rand.Rand, trData, trSearch TimeRange,
|
|||
|
||||
// Create a table from rowss and test search on it.
|
||||
strg := newTestStorage()
|
||||
tb, err := openTable("./test-table", strg)
|
||||
tb, err := openTable("test-table", strg)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create table: %s", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := os.RemoveAll("./test-table"); err != nil {
|
||||
if err := os.RemoveAll("test-table"); err != nil {
|
||||
t.Fatalf("cannot remove table directory: %s", err)
|
||||
}
|
||||
}()
|
||||
|
@ -205,7 +205,7 @@ func testTableSearchEx(t *testing.T, rng *rand.Rand, trData, trSearch TimeRange,
|
|||
tb.MustClose()
|
||||
|
||||
// Open the created table and test search on it.
|
||||
tb, err = openTable("./test-table", strg)
|
||||
tb, err = openTable("test-table", strg)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open table: %s", err)
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
@ -14,7 +15,7 @@ import (
|
|||
|
||||
func TestMain(m *testing.M) {
|
||||
n := m.Run()
|
||||
if err := os.RemoveAll("./benchmarkTableSearch"); err != nil {
|
||||
if err := os.RemoveAll("benchmarkTableSearch"); err != nil {
|
||||
panic(fmt.Errorf("cannot remove benchmark tables: %w", err))
|
||||
}
|
||||
os.Exit(n)
|
||||
|
@ -39,7 +40,7 @@ func BenchmarkTableSearch(b *testing.B) {
|
|||
func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount, tsidsCount int) *table {
|
||||
b.Helper()
|
||||
|
||||
path := fmt.Sprintf("./benchmarkTableSearch/rows%d_tsids%d", rowsCount, tsidsCount)
|
||||
path := filepath.Join("benchmarkTableSearch", fmt.Sprintf("rows%d_tsids%d", rowsCount, tsidsCount))
|
||||
if !createdBenchTables[path] {
|
||||
createBenchTable(b, path, startTimestamp, rowsPerInsert, rowsCount, tsidsCount)
|
||||
createdBenchTables[path] = true
|
||||
|
|
|
@ -45,7 +45,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
|
|||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(rowsCountExpected))
|
||||
tablePath := "./benchmarkTableAddRows"
|
||||
tablePath := "benchmarkTableAddRows"
|
||||
strg := newTestStorage()
|
||||
for i := 0; i < b.N; i++ {
|
||||
tb, err := openTable(tablePath, strg)
|
||||
|
|
Loading…
Reference in a new issue