VictoriaMetrics/lib/filestream/filestream.go
Zongyang b478c4f56f
FIX bottomk doesn't return any data when there are no time range overlap between timeseries (#5509)
* FIX sort order in bottomk

* Add lessWithNaNsReversed for bottomk

* Add ut for TopK

* Move lt from loop

* FIX lint

* FIX lint

* FIX lint

* Mod log format

---------

Co-authored-by: xiaozongyang <xiaozngyang@kanyun.com>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2024-01-16 22:19:51 +02:00

320 lines
7.6 KiB
Go

package filestream
import (
"bufio"
"fmt"
"io"
"os"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/metrics"
)
const dontNeedBlockSize = 16 * 1024 * 1024
// ReadCloser is a standard interface for filestream Reader.
type ReadCloser interface {
Read(p []byte) (int, error)
MustClose()
}
// WriteCloser is a standard interface for filestream Writer.
type WriteCloser interface {
Write(p []byte) (int, error)
MustClose()
}
func getBufferSize() int {
bufferSizeOnce.Do(func() {
n := memory.Allowed() / 1024 / 8
if n < 4*1024 {
n = 4 * 1024
}
if n > 512*1024 {
n = 512 * 1024
}
bufferSize = n
})
return bufferSize
}
var (
bufferSize int
bufferSizeOnce sync.Once
)
// Reader implements buffered file reader.
type Reader struct {
f *os.File
br *bufio.Reader
st streamTracker
}
// OpenReaderAt opens the file at the given path in nocache mode at the given offset.
//
// If nocache is set, then the reader doesn't pollute OS page cache.
func OpenReaderAt(path string, offset int64, nocache bool) (*Reader, error) {
r, err := Open(path, nocache)
if err != nil {
return nil, err
}
n, err := r.f.Seek(offset, io.SeekStart)
if err != nil {
r.MustClose()
return nil, fmt.Errorf("cannot seek to offset=%d for %q: %w", offset, path, err)
}
if n != offset {
r.MustClose()
return nil, fmt.Errorf("invalid seek offset for %q; got %d; want %d", path, n, offset)
}
return r, nil
}
// Open opens the file from the given path in nocache mode.
//
// If nocache is set, then the reader doesn't pollute OS page cache.
func Open(path string, nocache bool) (*Reader, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
r := &Reader{
f: f,
br: getBufioReader(f),
}
if nocache {
r.st.fd = f.Fd()
}
readersCount.Inc()
return r, nil
}
// MustClose closes the underlying file passed to Open.
func (r *Reader) MustClose() {
if err := r.st.close(); err != nil {
logger.Panicf("FATAL: cannot close streamTracker for file %q: %s", r.f.Name(), err)
}
if err := r.f.Close(); err != nil {
logger.Panicf("FATAL: cannot close file %q: %s", r.f.Name(), err)
}
r.f = nil
putBufioReader(r.br)
r.br = nil
readersCount.Dec()
}
var (
readDuration = metrics.NewFloatCounter(`vm_filestream_read_duration_seconds_total`)
readCallsBuffered = metrics.NewCounter(`vm_filestream_buffered_read_calls_total`)
readCallsReal = metrics.NewCounter(`vm_filestream_real_read_calls_total`)
readBytesBuffered = metrics.NewCounter(`vm_filestream_buffered_read_bytes_total`)
readBytesReal = metrics.NewCounter(`vm_filestream_real_read_bytes_total`)
readersCount = metrics.NewCounter(`vm_filestream_readers`)
)
// Read reads file contents to p.
func (r *Reader) Read(p []byte) (int, error) {
startTime := time.Now()
defer func() {
d := time.Since(startTime).Seconds()
readDuration.Add(d)
}()
readCallsBuffered.Inc()
n, err := r.br.Read(p)
readBytesBuffered.Add(n)
if err != nil {
return n, err
}
if err := r.st.adviseDontNeed(n, false); err != nil {
return n, fmt.Errorf("advise error for %q: %w", r.f.Name(), err)
}
return n, nil
}
type statReader struct {
*os.File
}
func (sr *statReader) Read(p []byte) (int, error) {
readCallsReal.Inc()
n, err := sr.File.Read(p)
readBytesReal.Add(n)
return n, err
}
func getBufioReader(f *os.File) *bufio.Reader {
sr := &statReader{f}
v := brPool.Get()
if v == nil {
return bufio.NewReaderSize(sr, getBufferSize())
}
br := v.(*bufio.Reader)
br.Reset(sr)
return br
}
func putBufioReader(br *bufio.Reader) {
brPool.Put(br)
}
var brPool sync.Pool
// Writer implements buffered file writer.
type Writer struct {
f *os.File
bw *bufio.Writer
st streamTracker
}
// OpenWriterAt opens the file at path in nocache mode for writing at the given offset.
//
// The file at path is created if it is missing.
//
// If nocache is set, the writer doesn't pollute OS page cache.
func OpenWriterAt(path string, offset int64, nocache bool) (*Writer, error) {
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return nil, err
}
n, err := f.Seek(offset, io.SeekStart)
if err != nil {
_ = f.Close()
return nil, fmt.Errorf("cannot seek to offset=%d in %q: %w", offset, path, err)
}
if n != offset {
_ = f.Close()
return nil, fmt.Errorf("invalid seek offset for %q; got %d; want %d", path, n, offset)
}
return newWriter(f, nocache), nil
}
// Create creates the file for the given path in nocache mode.
//
// If nocache is set, the writer doesn't pollute OS page cache.
func Create(path string, nocache bool) (*Writer, error) {
f, err := os.Create(path)
if err != nil {
return nil, fmt.Errorf("cannot create file %q: %w", path, err)
}
return newWriter(f, nocache), nil
}
func newWriter(f *os.File, nocache bool) *Writer {
w := &Writer{
f: f,
bw: getBufioWriter(f),
}
if nocache {
w.st.fd = f.Fd()
}
writersCount.Inc()
return w
}
// MustClose syncs the underlying file to storage and then closes it.
func (w *Writer) MustClose() {
if err := w.bw.Flush(); err != nil {
logger.Panicf("FATAL: cannot flush buffered data to file %q: %s", w.f.Name(), err)
}
putBufioWriter(w.bw)
w.bw = nil
if err := w.f.Sync(); err != nil {
logger.Panicf("FATAL: cannot sync file %q: %d", w.f.Name(), err)
}
if err := w.st.close(); err != nil {
logger.Panicf("FATAL: cannot close streamTracker for file %q: %s", w.f.Name(), err)
}
if err := w.f.Close(); err != nil {
logger.Panicf("FATAL: cannot close file %q: %s", w.f.Name(), err)
}
w.f = nil
writersCount.Dec()
}
var (
writeDuration = metrics.NewFloatCounter(`vm_filestream_write_duration_seconds_total`)
writeCallsBuffered = metrics.NewCounter(`vm_filestream_buffered_write_calls_total`)
writeCallsReal = metrics.NewCounter(`vm_filestream_real_write_calls_total`)
writtenBytesBuffered = metrics.NewCounter(`vm_filestream_buffered_written_bytes_total`)
writtenBytesReal = metrics.NewCounter(`vm_filestream_real_written_bytes_total`)
writersCount = metrics.NewCounter(`vm_filestream_writers`)
)
// Write writes p to the underlying file.
func (w *Writer) Write(p []byte) (int, error) {
startTime := time.Now()
defer func() {
d := time.Since(startTime).Seconds()
writeDuration.Add(d)
}()
writeCallsBuffered.Inc()
n, err := w.bw.Write(p)
writtenBytesBuffered.Add(n)
if err != nil {
return n, err
}
if err := w.st.adviseDontNeed(n, true); err != nil {
return n, fmt.Errorf("advise error for %q: %w", w.f.Name(), err)
}
return n, nil
}
// MustFlush flushes all the buffered data to file.
//
// if isSync is true, then the flushed data is fsynced to the underlying storage.
func (w *Writer) MustFlush(isSync bool) {
startTime := time.Now()
defer func() {
d := time.Since(startTime).Seconds()
writeDuration.Add(d)
}()
if err := w.bw.Flush(); err != nil {
logger.Panicf("FATAL: cannot flush buffered data to file %q: %s", w.f.Name(), err)
}
if isSync {
if err := w.f.Sync(); err != nil {
logger.Panicf("FATAL: cannot fsync data to the underlying storage for file %q: %s", w.f.Name(), err)
}
}
}
type statWriter struct {
*os.File
}
func (sw *statWriter) Write(p []byte) (int, error) {
writeCallsReal.Inc()
n, err := sw.File.Write(p)
writtenBytesReal.Add(n)
return n, err
}
func getBufioWriter(f *os.File) *bufio.Writer {
sw := &statWriter{f}
v := bwPool.Get()
if v == nil {
return bufio.NewWriterSize(sw, getBufferSize())
}
bw := v.(*bufio.Writer)
bw.Reset(sw)
return bw
}
func putBufioWriter(bw *bufio.Writer) {
bwPool.Put(bw)
}
var bwPool sync.Pool
type streamTracker struct {
fd uintptr
offset uint64 // nolint
length uint64 // nolint
}