mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
df99965564
Callers of this function log the returned error and exit. It is better logging the error together with the path to the filename and call stack directly inside the function. This simplifies the code at callers' side without reducing the level of debuggability
329 lines
7.8 KiB
Go
329 lines
7.8 KiB
Go
package filestream
|
|
|
|
import (
|
|
"bufio"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
const dontNeedBlockSize = 16 * 1024 * 1024
|
|
|
|
// ReadCloser is a standard interface for filestream Reader.
|
|
type ReadCloser interface {
|
|
Path() string
|
|
Read(p []byte) (int, error)
|
|
MustClose()
|
|
}
|
|
|
|
// WriteCloser is a standard interface for filestream Writer.
|
|
type WriteCloser interface {
|
|
Path() string
|
|
Write(p []byte) (int, error)
|
|
MustClose()
|
|
}
|
|
|
|
func getBufferSize() int {
|
|
bufferSizeOnce.Do(func() {
|
|
n := memory.Allowed() / 1024 / 8
|
|
if n < 4*1024 {
|
|
n = 4 * 1024
|
|
}
|
|
if n > 512*1024 {
|
|
n = 512 * 1024
|
|
}
|
|
bufferSize = n
|
|
})
|
|
return bufferSize
|
|
}
|
|
|
|
var (
|
|
bufferSize int
|
|
bufferSizeOnce sync.Once
|
|
)
|
|
|
|
// Reader implements buffered file reader.
|
|
type Reader struct {
|
|
f *os.File
|
|
br *bufio.Reader
|
|
st streamTracker
|
|
}
|
|
|
|
// Path returns the path to r
|
|
func (r *Reader) Path() string {
|
|
return r.f.Name()
|
|
}
|
|
|
|
// OpenReaderAt opens the file at the given path in nocache mode at the given offset.
|
|
//
|
|
// If nocache is set, then the reader doesn't pollute OS page cache.
|
|
func OpenReaderAt(path string, offset int64, nocache bool) (*Reader, error) {
|
|
r := MustOpen(path, nocache)
|
|
n, err := r.f.Seek(offset, io.SeekStart)
|
|
if err != nil {
|
|
r.MustClose()
|
|
return nil, fmt.Errorf("cannot seek to offset=%d for %q: %w", offset, path, err)
|
|
}
|
|
if n != offset {
|
|
r.MustClose()
|
|
return nil, fmt.Errorf("invalid seek offset for %q; got %d; want %d", path, n, offset)
|
|
}
|
|
return r, nil
|
|
}
|
|
|
|
// MustOpen opens the file from the given path in nocache mode.
|
|
//
|
|
// If nocache is set, then the reader doesn't pollute OS page cache.
|
|
func MustOpen(path string, nocache bool) *Reader {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
logger.Panicf("FATAL: cannot open file: %s", err)
|
|
}
|
|
r := &Reader{
|
|
f: f,
|
|
br: getBufioReader(f),
|
|
}
|
|
if nocache {
|
|
r.st.fd = f.Fd()
|
|
}
|
|
readersCount.Inc()
|
|
return r
|
|
}
|
|
|
|
// MustClose closes the underlying file passed to MustOpen.
|
|
func (r *Reader) MustClose() {
|
|
if err := r.st.close(); err != nil {
|
|
logger.Panicf("FATAL: cannot close streamTracker for file %q: %s", r.f.Name(), err)
|
|
}
|
|
if err := r.f.Close(); err != nil {
|
|
logger.Panicf("FATAL: cannot close file %q: %s", r.f.Name(), err)
|
|
}
|
|
r.f = nil
|
|
|
|
putBufioReader(r.br)
|
|
r.br = nil
|
|
|
|
readersCount.Dec()
|
|
}
|
|
|
|
var (
|
|
readDuration = metrics.NewFloatCounter(`vm_filestream_read_duration_seconds_total`)
|
|
readCallsBuffered = metrics.NewCounter(`vm_filestream_buffered_read_calls_total`)
|
|
readCallsReal = metrics.NewCounter(`vm_filestream_real_read_calls_total`)
|
|
readBytesBuffered = metrics.NewCounter(`vm_filestream_buffered_read_bytes_total`)
|
|
readBytesReal = metrics.NewCounter(`vm_filestream_real_read_bytes_total`)
|
|
readersCount = metrics.NewCounter(`vm_filestream_readers`)
|
|
)
|
|
|
|
// Read reads file contents to p.
|
|
func (r *Reader) Read(p []byte) (int, error) {
|
|
startTime := time.Now()
|
|
defer func() {
|
|
d := time.Since(startTime).Seconds()
|
|
readDuration.Add(d)
|
|
}()
|
|
readCallsBuffered.Inc()
|
|
n, err := r.br.Read(p)
|
|
readBytesBuffered.Add(n)
|
|
if err != nil {
|
|
return n, err
|
|
}
|
|
if err := r.st.adviseDontNeed(n, false); err != nil {
|
|
return n, fmt.Errorf("advise error for %q: %w", r.f.Name(), err)
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
type statReader struct {
|
|
*os.File
|
|
}
|
|
|
|
func (sr *statReader) Read(p []byte) (int, error) {
|
|
readCallsReal.Inc()
|
|
n, err := sr.File.Read(p)
|
|
readBytesReal.Add(n)
|
|
return n, err
|
|
}
|
|
|
|
func getBufioReader(f *os.File) *bufio.Reader {
|
|
sr := &statReader{f}
|
|
v := brPool.Get()
|
|
if v == nil {
|
|
return bufio.NewReaderSize(sr, getBufferSize())
|
|
}
|
|
br := v.(*bufio.Reader)
|
|
br.Reset(sr)
|
|
return br
|
|
}
|
|
|
|
func putBufioReader(br *bufio.Reader) {
|
|
brPool.Put(br)
|
|
}
|
|
|
|
var brPool sync.Pool
|
|
|
|
// Writer implements buffered file writer.
|
|
type Writer struct {
|
|
f *os.File
|
|
bw *bufio.Writer
|
|
st streamTracker
|
|
}
|
|
|
|
// Path returns the path to r
|
|
func (w *Writer) Path() string {
|
|
return w.f.Name()
|
|
}
|
|
|
|
// OpenWriterAt opens the file at path in nocache mode for writing at the given offset.
|
|
//
|
|
// The file at path is created if it is missing.
|
|
//
|
|
// If nocache is set, the writer doesn't pollute OS page cache.
|
|
func OpenWriterAt(path string, offset int64, nocache bool) (*Writer, error) {
|
|
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
n, err := f.Seek(offset, io.SeekStart)
|
|
if err != nil {
|
|
_ = f.Close()
|
|
return nil, fmt.Errorf("cannot seek to offset=%d in %q: %w", offset, path, err)
|
|
}
|
|
if n != offset {
|
|
_ = f.Close()
|
|
return nil, fmt.Errorf("invalid seek offset for %q; got %d; want %d", path, n, offset)
|
|
}
|
|
return newWriter(f, nocache), nil
|
|
}
|
|
|
|
// MustCreate creates the file for the given path in nocache mode.
|
|
//
|
|
// If nocache is set, the writer doesn't pollute OS page cache.
|
|
func MustCreate(path string, nocache bool) *Writer {
|
|
f, err := os.Create(path)
|
|
if err != nil {
|
|
logger.Panicf("FATAL: cannot create file %q: %s", path, err)
|
|
}
|
|
return newWriter(f, nocache)
|
|
}
|
|
|
|
func newWriter(f *os.File, nocache bool) *Writer {
|
|
w := &Writer{
|
|
f: f,
|
|
bw: getBufioWriter(f),
|
|
}
|
|
if nocache {
|
|
w.st.fd = f.Fd()
|
|
}
|
|
writersCount.Inc()
|
|
return w
|
|
}
|
|
|
|
// MustClose syncs the underlying file to storage and then closes it.
|
|
func (w *Writer) MustClose() {
|
|
if err := w.bw.Flush(); err != nil {
|
|
logger.Panicf("FATAL: cannot flush buffered data to file %q: %s", w.f.Name(), err)
|
|
}
|
|
putBufioWriter(w.bw)
|
|
w.bw = nil
|
|
|
|
if err := w.f.Sync(); err != nil {
|
|
logger.Panicf("FATAL: cannot sync file %q: %d", w.f.Name(), err)
|
|
}
|
|
if err := w.st.close(); err != nil {
|
|
logger.Panicf("FATAL: cannot close streamTracker for file %q: %s", w.f.Name(), err)
|
|
}
|
|
if err := w.f.Close(); err != nil {
|
|
logger.Panicf("FATAL: cannot close file %q: %s", w.f.Name(), err)
|
|
}
|
|
w.f = nil
|
|
|
|
writersCount.Dec()
|
|
}
|
|
|
|
var (
|
|
writeDuration = metrics.NewFloatCounter(`vm_filestream_write_duration_seconds_total`)
|
|
writeCallsBuffered = metrics.NewCounter(`vm_filestream_buffered_write_calls_total`)
|
|
writeCallsReal = metrics.NewCounter(`vm_filestream_real_write_calls_total`)
|
|
writtenBytesBuffered = metrics.NewCounter(`vm_filestream_buffered_written_bytes_total`)
|
|
writtenBytesReal = metrics.NewCounter(`vm_filestream_real_written_bytes_total`)
|
|
writersCount = metrics.NewCounter(`vm_filestream_writers`)
|
|
)
|
|
|
|
// Write writes p to the underlying file.
|
|
func (w *Writer) Write(p []byte) (int, error) {
|
|
startTime := time.Now()
|
|
defer func() {
|
|
d := time.Since(startTime).Seconds()
|
|
writeDuration.Add(d)
|
|
}()
|
|
writeCallsBuffered.Inc()
|
|
n, err := w.bw.Write(p)
|
|
writtenBytesBuffered.Add(n)
|
|
if err != nil {
|
|
return n, err
|
|
}
|
|
if err := w.st.adviseDontNeed(n, true); err != nil {
|
|
return n, fmt.Errorf("advise error for %q: %w", w.f.Name(), err)
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
// MustFlush flushes all the buffered data to file.
|
|
//
|
|
// if isSync is true, then the flushed data is fsynced to the underlying storage.
|
|
func (w *Writer) MustFlush(isSync bool) {
|
|
startTime := time.Now()
|
|
defer func() {
|
|
d := time.Since(startTime).Seconds()
|
|
writeDuration.Add(d)
|
|
}()
|
|
if err := w.bw.Flush(); err != nil {
|
|
logger.Panicf("FATAL: cannot flush buffered data to file %q: %s", w.f.Name(), err)
|
|
}
|
|
if isSync {
|
|
if err := w.f.Sync(); err != nil {
|
|
logger.Panicf("FATAL: cannot fsync data to the underlying storage for file %q: %s", w.f.Name(), err)
|
|
}
|
|
}
|
|
}
|
|
|
|
type statWriter struct {
|
|
*os.File
|
|
}
|
|
|
|
func (sw *statWriter) Write(p []byte) (int, error) {
|
|
writeCallsReal.Inc()
|
|
n, err := sw.File.Write(p)
|
|
writtenBytesReal.Add(n)
|
|
return n, err
|
|
}
|
|
|
|
func getBufioWriter(f *os.File) *bufio.Writer {
|
|
sw := &statWriter{f}
|
|
v := bwPool.Get()
|
|
if v == nil {
|
|
return bufio.NewWriterSize(sw, getBufferSize())
|
|
}
|
|
bw := v.(*bufio.Writer)
|
|
bw.Reset(sw)
|
|
return bw
|
|
}
|
|
|
|
func putBufioWriter(bw *bufio.Writer) {
|
|
bwPool.Put(bw)
|
|
}
|
|
|
|
var bwPool sync.Pool
|
|
|
|
type streamTracker struct {
|
|
fd uintptr
|
|
offset uint64
|
|
length uint64
|
|
}
|