mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-30 15:22:07 +00:00
Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files
This commit is contained in:
commit
966e9c227a
10 changed files with 285 additions and 52 deletions
1
.github/workflows/main.yml
vendored
1
.github/workflows/main.yml
vendored
|
@ -58,6 +58,7 @@ jobs:
|
|||
GOOS=darwin go build -mod=vendor ./app/vmbackup
|
||||
GOOS=darwin go build -mod=vendor ./app/vmrestore
|
||||
GOOS=darwin go build -mod=vendor ./app/vmctl
|
||||
CGO_ENABLED=0 GOOS=windows go build -mod=vendor ./app/vmagent
|
||||
- name: Publish coverage
|
||||
uses: codecov/codecov-action@v1.0.6
|
||||
with:
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
* FEATURE: vmagent: optimize [relabeling](https://victoriametrics.github.io/vmagent.html#relabeling) performance for common cases.
|
||||
* FEATURE: add `increase_pure(m[d])` function to MetricsQL. It works the same as `increase(m[d])` except of various edge cases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/962) for details.
|
||||
* FEATURE: increase accuracy for `buckets_limit(limit, buckets)` results for small `limit` values. See [MetricsQL docs](https://victoriametrics.github.io/MetricsQL.html) for details.
|
||||
* FEATURE: vmagent: initial support for Windows build with `CGO_ENABLED=0 GOOS=windows go build -mod=vendor ./app/vmagent`. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1036).
|
||||
|
||||
* BUGFIX: vmagent: properly perform graceful shutdown on `SIGINT` and `SIGTERM` signals. The graceful shutdown has been broken in `v1.54.0`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1065
|
||||
* BUGFIX: reduce the probability of `duplicate time series` errors when querying Kubernetes metrics.
|
||||
|
|
41
lib/fs/fs.go
41
lib/fs/fs.go
|
@ -13,26 +13,15 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
var tmpFileNum uint64
|
||||
|
||||
// MustSyncPath syncs contents of the given path.
|
||||
func MustSyncPath(path string) {
|
||||
d, err := os.Open(path)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot open %q: %s", path, err)
|
||||
}
|
||||
if err := d.Sync(); err != nil {
|
||||
_ = d.Close()
|
||||
logger.Panicf("FATAL: cannot flush %q to storage: %s", path, err)
|
||||
}
|
||||
if err := d.Close(); err != nil {
|
||||
logger.Panicf("FATAL: cannot close %q: %s", path, err)
|
||||
}
|
||||
mustSyncPath(path)
|
||||
}
|
||||
|
||||
var tmpFileNum uint64
|
||||
|
||||
// WriteFileAtomically atomically writes data to the given file path.
|
||||
//
|
||||
// WriteFileAtomically returns only after the file is fully written and synced
|
||||
|
@ -333,14 +322,7 @@ func MustWriteData(w io.Writer, data []byte) {
|
|||
// and returns the handler to the file.
|
||||
func CreateFlockFile(dir string) (*os.File, error) {
|
||||
flockFile := dir + "/flock.lock"
|
||||
flockF, err := os.Create(flockFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create lock file %q: %w", flockFile, err)
|
||||
}
|
||||
if err := unix.Flock(int(flockF.Fd()), unix.LOCK_EX|unix.LOCK_NB); err != nil {
|
||||
return nil, fmt.Errorf("cannot acquire lock on file %q: %w", flockFile, err)
|
||||
}
|
||||
return flockF, nil
|
||||
return createFlockFile(flockFile)
|
||||
}
|
||||
|
||||
// MustGetFreeSpace returns free space for the given directory path.
|
||||
|
@ -372,18 +354,3 @@ type freeSpaceEntry struct {
|
|||
updateTime uint64
|
||||
freeSpace uint64
|
||||
}
|
||||
|
||||
func mustGetFreeSpace(path string) uint64 {
|
||||
d, err := os.Open(path)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot determine free disk space on %q: %s", path, err)
|
||||
}
|
||||
defer MustClose(d)
|
||||
|
||||
fd := d.Fd()
|
||||
var stat unix.Statfs_t
|
||||
if err := unix.Fstatfs(int(fd), &stat); err != nil {
|
||||
logger.Panicf("FATAL: cannot determine free disk space on %q: %s", path, err)
|
||||
}
|
||||
return freeSpace(stat)
|
||||
}
|
||||
|
|
|
@ -2,7 +2,9 @@
|
|||
|
||||
package fs
|
||||
|
||||
import "golang.org/x/sys/unix"
|
||||
import (
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
func freeSpace(stat unix.Statfs_t) uint64 {
|
||||
return uint64(stat.Bavail) * uint64(stat.Bsize)
|
||||
|
|
59
lib/fs/fs_unix.go
Normal file
59
lib/fs/fs_unix.go
Normal file
|
@ -0,0 +1,59 @@
|
|||
// +build linux darwin freebsd openbsd
|
||||
|
||||
package fs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
func mmap(fd int, length int) (data []byte, err error) {
|
||||
return unix.Mmap(fd, 0, length, unix.PROT_READ, unix.MAP_SHARED)
|
||||
|
||||
}
|
||||
func mUnmap(data []byte) error {
|
||||
return unix.Munmap(data)
|
||||
}
|
||||
|
||||
func mustSyncPath(path string) {
|
||||
d, err := os.Open(path)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot open %q: %s", path, err)
|
||||
}
|
||||
if err := d.Sync(); err != nil {
|
||||
_ = d.Close()
|
||||
logger.Panicf("FATAL: cannot flush %q to storage: %s", path, err)
|
||||
}
|
||||
if err := d.Close(); err != nil {
|
||||
logger.Panicf("FATAL: cannot close %q: %s", path, err)
|
||||
}
|
||||
}
|
||||
|
||||
func createFlockFile(flockFile string) (*os.File, error) {
|
||||
flockF, err := os.Create(flockFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create lock file %q: %w", flockFile, err)
|
||||
}
|
||||
if err := unix.Flock(int(flockF.Fd()), unix.LOCK_EX|unix.LOCK_NB); err != nil {
|
||||
return nil, fmt.Errorf("cannot acquire lock on file %q: %w", flockFile, err)
|
||||
}
|
||||
return flockF, nil
|
||||
}
|
||||
|
||||
func mustGetFreeSpace(path string) uint64 {
|
||||
d, err := os.Open(path)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot determine free disk space on %q: %s", path, err)
|
||||
}
|
||||
defer MustClose(d)
|
||||
|
||||
fd := d.Fd()
|
||||
var stat unix.Statfs_t
|
||||
if err := unix.Fstatfs(int(fd), &stat); err != nil {
|
||||
logger.Panicf("FATAL: cannot determine free disk space on %q: %s", path, err)
|
||||
}
|
||||
return freeSpace(stat)
|
||||
}
|
146
lib/fs/fs_windows.go
Normal file
146
lib/fs/fs_windows.go
Normal file
|
@ -0,0 +1,146 @@
|
|||
package fs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"golang.org/x/sys/windows"
|
||||
)
|
||||
|
||||
var (
|
||||
kernelDLL = windows.MustLoadDLL("kernel32.dll")
|
||||
procLock = kernelDLL.MustFindProc("LockFileEx")
|
||||
procEvent = kernelDLL.MustFindProc("CreateEventW")
|
||||
procDisk = kernelDLL.MustFindProc("GetDiskFreeSpaceExW")
|
||||
ntDLL = windows.MustLoadDLL("ntdll.dll")
|
||||
ntSetInformationProc = ntDLL.MustFindProc("NtSetInformationFile")
|
||||
)
|
||||
|
||||
// panic at windows, if file already open by another process.
|
||||
// one of possible solutions - change files opening process with correct flags.
|
||||
// https://github.com/dgraph-io/badger/issues/699
|
||||
// https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-flushfilebuffers
|
||||
func mustSyncPath(string) {
|
||||
}
|
||||
|
||||
const (
|
||||
lockfileExclusiveLock = 2
|
||||
fileFlagNormal = 0x00000080
|
||||
// https://docs.microsoft.com/en-us/windows-hardware/drivers/ddi/ntddk/ns-ntddk-_file_disposition_information_ex
|
||||
fileDispositionPosixSemantics = 0x00000002
|
||||
fileDispositionIgnoreReadonlyAttribute = 0x00000010
|
||||
)
|
||||
|
||||
// https://github.com/juju/fslock/blob/master/fslock_windows.go
|
||||
func createFlockFile(flockFile string) (*os.File, error) {
|
||||
name, err := windows.UTF16PtrFromString(flockFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
handle, err := windows.CreateFile(
|
||||
name,
|
||||
windows.GENERIC_READ|windows.DELETE,
|
||||
windows.FILE_SHARE_READ|windows.FILE_SHARE_DELETE,
|
||||
nil,
|
||||
windows.OPEN_ALWAYS,
|
||||
windows.FILE_FLAG_OVERLAPPED|fileFlagNormal,
|
||||
0)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create lock file %q: %w", flockFile, err)
|
||||
}
|
||||
ol, err := newOverlapped()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create Overlapped handler: %w", err)
|
||||
}
|
||||
// https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-lockfileex
|
||||
// overlapped is dropped?
|
||||
r1, _, err := procLock.Call(uintptr(handle), uintptr(lockfileExclusiveLock), uintptr(0), uintptr(1), uintptr(0), uintptr(unsafe.Pointer(ol)))
|
||||
if r1 == 0 {
|
||||
return nil, err
|
||||
}
|
||||
return os.NewFile(uintptr(handle), flockFile), nil
|
||||
}
|
||||
|
||||
// stub
|
||||
func mmap(fd int, length int) ([]byte, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// stub
|
||||
func mUnmap([]byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func mustGetFreeSpace(path string) uint64 {
|
||||
var freeBytes int64
|
||||
r, _, err := procDisk.Call(uintptr(unsafe.Pointer(windows.StringToUTF16Ptr(path))),
|
||||
uintptr(unsafe.Pointer(&freeBytes)))
|
||||
if r == 0 {
|
||||
logger.Errorf("cannot get free space: %v", err)
|
||||
return 0
|
||||
}
|
||||
return uint64(freeBytes)
|
||||
}
|
||||
|
||||
// stub
|
||||
func fadviseSequentialRead(f *os.File, prefetch bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// copied from https://github.com/juju/fslock/blob/master/fslock_windows.go
|
||||
// https://docs.microsoft.com/en-us/windows/win32/api/minwinbase/ns-minwinbase-overlapped
|
||||
func newOverlapped() (*windows.Overlapped, error) {
|
||||
event, err := createEvent(nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &windows.Overlapped{HEvent: event}, nil
|
||||
}
|
||||
|
||||
// copied from https://github.com/juju/fslock/blob/master/fslock_windows.go
|
||||
// https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-createeventa
|
||||
func createEvent(sa *windows.SecurityAttributes, name *uint16) (windows.Handle, error) {
|
||||
r0, _, err := procEvent.Call(uintptr(unsafe.Pointer(sa)), uintptr(1), uintptr(1), uintptr(unsafe.Pointer(name)))
|
||||
handle := windows.Handle(r0)
|
||||
if handle == windows.InvalidHandle {
|
||||
return 0, err
|
||||
}
|
||||
return handle, nil
|
||||
}
|
||||
|
||||
// https://docs.microsoft.com/en-us/windows-hardware/drivers/ddi/ntddk/ns-ntddk-_file_disposition_information_ex
|
||||
type fileDispositionInformationEx struct {
|
||||
Flags uint32
|
||||
}
|
||||
|
||||
// https://docs.microsoft.com/en-us/windows-hardware/drivers/ddi/wdm/ns-wdm-_io_status_block
|
||||
type ioStatusBlock struct {
|
||||
Status, Information uintptr
|
||||
}
|
||||
|
||||
// UpdateFileHandle - changes file deletion semantic at windows to posix-like.
|
||||
func UpdateFileHandle(path string) error {
|
||||
handle, err := windows.Open(path, windows.GENERIC_READ|windows.DELETE, windows.FILE_SHARE_READ|windows.FILE_SHARE_DELETE)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return setPosixDelete(handle)
|
||||
}
|
||||
|
||||
// supported starting with Windows 10, version 1709.
|
||||
// supported by NTFS only.
|
||||
func setPosixDelete(handle windows.Handle) error {
|
||||
var iosb ioStatusBlock
|
||||
flags := fileDispositionInformationEx{
|
||||
Flags: fileDispositionPosixSemantics | fileDispositionIgnoreReadonlyAttribute,
|
||||
}
|
||||
// class FileDispositionInformationEx, // 64
|
||||
// https://docs.microsoft.com/en-us/windows-hardware/drivers/ddi/wdm/ne-wdm-_file_information_class
|
||||
r0, _, err := ntSetInformationProc.Call(uintptr(handle), uintptr(unsafe.Pointer(&iosb)), uintptr(unsafe.Pointer(&flags)), unsafe.Sizeof(flags), uintptr(64))
|
||||
if r0 == 0 {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("cannot set file disposition information: NT_STATUS: 0x%X, error: %w", r0, err)
|
||||
}
|
|
@ -7,7 +7,6 @@ import (
|
|||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
var disableMmap = flag.Bool("fs.disableMmap", is32BitPtr, "Whether to use pread() instead of mmap() for reading data files. "+
|
||||
|
@ -65,7 +64,7 @@ func (r *ReaderAt) MustReadAt(p []byte, off int64) {
|
|||
func (r *ReaderAt) MustClose() {
|
||||
fname := r.f.Name()
|
||||
if len(r.mmapData) > 0 {
|
||||
if err := unix.Munmap(r.mmapData[:cap(r.mmapData)]); err != nil {
|
||||
if err := mUnmap(r.mmapData[:cap(r.mmapData)]); err != nil {
|
||||
logger.Panicf("FATAL: cannot unmap data for file %q: %s", fname, err)
|
||||
}
|
||||
r.mmapData = nil
|
||||
|
@ -135,7 +134,7 @@ func mmapFile(f *os.File, size int64) ([]byte, error) {
|
|||
if size%4096 != 0 {
|
||||
size += 4096 - size%4096
|
||||
}
|
||||
data, err := unix.Mmap(int(f.Fd()), 0, int(size), unix.PROT_READ, unix.MAP_SHARED)
|
||||
data, err := mmap(int(f.Fd()), int(size))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot mmap file with size %d: %w", size, err)
|
||||
}
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
// +build !windows
|
||||
|
||||
package procutil
|
||||
|
||||
import (
|
||||
|
|
60
lib/procutil/signal_windows.go
Normal file
60
lib/procutil/signal_windows.go
Normal file
|
@ -0,0 +1,60 @@
|
|||
// +build windows
|
||||
|
||||
package procutil
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// WaitForSigterm waits for either SIGTERM or SIGINT
|
||||
//
|
||||
// Returns the caught signal.
|
||||
//
|
||||
// Windows dont have SIGHUP syscall.
|
||||
func WaitForSigterm() os.Signal {
|
||||
ch := make(chan os.Signal, 1)
|
||||
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
|
||||
sig := <-ch
|
||||
return sig
|
||||
}
|
||||
|
||||
type sigHUPNotifier struct {
|
||||
lock sync.Mutex
|
||||
subscribers []chan<- os.Signal
|
||||
}
|
||||
|
||||
var notifier sigHUPNotifier
|
||||
|
||||
// https://golang.org/pkg/os/signal/#hdr-Windows
|
||||
// https://github.com/golang/go/issues/6948
|
||||
// SelfSIGHUP sends SIGHUP signal to the subscribed listeners.
|
||||
func SelfSIGHUP() {
|
||||
notifier.notify(syscall.SIGHUP)
|
||||
}
|
||||
|
||||
// NewSighupChan returns a channel, which is triggered on every SelfSIGHUP.
|
||||
func NewSighupChan() <-chan os.Signal {
|
||||
ch := make(chan os.Signal, 1)
|
||||
notifier.subscribe(ch)
|
||||
return ch
|
||||
}
|
||||
|
||||
func (sn *sigHUPNotifier) subscribe(sub chan<- os.Signal) {
|
||||
sn.lock.Lock()
|
||||
defer sn.lock.Unlock()
|
||||
sn.subscribers = append(sn.subscribers, sub)
|
||||
}
|
||||
|
||||
func (sn *sigHUPNotifier) notify(sig os.Signal) {
|
||||
sn.lock.Lock()
|
||||
defer sn.lock.Unlock()
|
||||
for _, sub := range sn.subscribers {
|
||||
select {
|
||||
case sub <- sig:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
|
@ -22,7 +22,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
||||
)
|
||||
|
||||
var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 2*time.Minute, "Timeout for requests to Kuberntes API server")
|
||||
var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 10*time.Minute, "How frequently to reload the full state from Kuberntes API server")
|
||||
|
||||
// apiConfig contains config for API server
|
||||
type apiConfig struct {
|
||||
|
@ -345,7 +345,7 @@ func (uw *urlWatcher) watchForUpdates(resourceVersion string) {
|
|||
if err != nil {
|
||||
logger.Errorf("error when performing a request to %q: %s", requestURL, err)
|
||||
backoffSleep()
|
||||
// There is no sense in reloading resources on non-http errors.
|
||||
resourceVersion = uw.reloadObjects()
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
|
@ -353,27 +353,23 @@ func (uw *urlWatcher) watchForUpdates(resourceVersion string) {
|
|||
_ = resp.Body.Close()
|
||||
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
|
||||
if resp.StatusCode == 410 {
|
||||
// Update stale resourceVersion. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
|
||||
resourceVersion = uw.reloadObjects()
|
||||
// There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
|
||||
backoffDelay = time.Second
|
||||
} else {
|
||||
backoffSleep()
|
||||
// There is no sense in reloading resources on non-410 status codes.
|
||||
}
|
||||
resourceVersion = uw.reloadObjects()
|
||||
continue
|
||||
}
|
||||
backoffDelay = time.Second
|
||||
err = uw.readObjectUpdateStream(resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
// The stream has been closed (probably due to timeout)
|
||||
backoffSleep()
|
||||
continue
|
||||
if !errors.Is(err, io.EOF) {
|
||||
logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err)
|
||||
}
|
||||
logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err)
|
||||
backoffSleep()
|
||||
// There is no sense in reloading resources on non-http errors.
|
||||
resourceVersion = uw.reloadObjects()
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue