lib/logstorage: follow-up for 8a23d08c21

- Compare the actual free disk space to the value provided via -storage.minFreeDiskSpaceBytes
  directly inside the Storage.IsReadOnly(). This should work fast in most cases.
  This simplifies the logic at lib/storage.

- Do not take into account -storage.minFreeDiskSpaceBytes during background merges, since
  it results in uncontrolled growth of small parts when the free disk space approaches -storage.minFreeDiskSpaceBytes.
  The background merge logic uses another mechanism for determining whether there is enough
  disk space for the merge - it reserves the needed disk space before the merge
  and releases it after the merge. This prevents from out of disk space errors during background merge.

- Properly handle corner cases for flushing in-memory data to disk when the storage
  enters read-only mode. This is better than losing the in-memory data.

- Return back Storage.MustAddRows() instead of Storage.AddRows(),
  since the only case when AddRows() can return error is when the storage is in read-only mode.
  This case must be handled by the caller by calling Storage.IsReadOnly()
  before adding rows to the storage.
  This simplifies the code a bit, since the caller of Storage.MustAddRows() shouldn't handle
  errors returned by Storage.AddRows().

- Properly store parsed logs to Storage if parts of the request contain invalid log lines.
  Previously the parsed logs could be lost in this case.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4737
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4945
This commit is contained in:
Aliaksandr Valialkin 2023-10-02 16:26:02 +02:00
parent cbbdf9cdf5
commit 120f3bc467
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
21 changed files with 213 additions and 258 deletions

View file

@ -94,19 +94,20 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
httpserver.Errorf(w, r, "%s", err) httpserver.Errorf(w, r, "%s", err)
return true return true
} }
if err := vlstorage.CanWriteData(); err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
processLogMessage := cp.GetProcessLogMessageFunc(lr) processLogMessage := cp.GetProcessLogMessageFunc(lr)
isGzip := r.Header.Get("Content-Encoding") == "gzip" isGzip := r.Header.Get("Content-Encoding") == "gzip"
n, err := readBulkRequest(r.Body, isGzip, cp.TimeField, cp.MsgField, processLogMessage) n, err := readBulkRequest(r.Body, isGzip, cp.TimeField, cp.MsgField, processLogMessage)
vlstorage.MustAddRows(lr)
logstorage.PutLogRows(lr)
if err != nil { if err != nil {
logger.Warnf("cannot decode log message #%d in /_bulk request: %s", n, err) logger.Warnf("cannot decode log message #%d in /_bulk request: %s", n, err)
return true return true
} }
err = vlstorage.AddRows(lr)
logstorage.PutLogRows(lr)
if err != nil {
httpserver.Errorf(w, r, "cannot insert rows: %s", err)
}
tookMs := time.Since(startTime).Milliseconds() tookMs := time.Since(startTime).Milliseconds()
bw := bufferedwriter.Get(w) bw := bufferedwriter.Get(w)
@ -132,7 +133,7 @@ var (
) )
func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
processLogMessage func(timestamp int64, fields []logstorage.Field) error, processLogMessage func(timestamp int64, fields []logstorage.Field),
) (int, error) { ) (int, error) {
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html // See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
@ -175,7 +176,7 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
var lineBufferPool bytesutil.ByteBufferPool var lineBufferPool bytesutil.ByteBufferPool
func readBulkLine(sc *bufio.Scanner, timeField, msgField string, func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
processLogMessage func(timestamp int64, fields []logstorage.Field) error, processLogMessage func(timestamp int64, fields []logstorage.Field),
) (bool, error) { ) (bool, error) {
var line []byte var line []byte
@ -222,11 +223,8 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
ts = time.Now().UnixNano() ts = time.Now().UnixNano()
} }
p.RenameField(msgField, "_msg") p.RenameField(msgField, "_msg")
err = processLogMessage(ts, p.Fields) processLogMessage(ts, p.Fields)
logjson.PutParser(p) logjson.PutParser(p)
if err != nil {
return false, err
}
return true, nil return true, nil
} }

View file

@ -15,9 +15,8 @@ func TestReadBulkRequestFailure(t *testing.T) {
f := func(data string) { f := func(data string) {
t.Helper() t.Helper()
processLogMessage := func(timestamp int64, fields []logstorage.Field) error { processLogMessage := func(timestamp int64, fields []logstorage.Field) {
t.Fatalf("unexpected call to processLogMessage with timestamp=%d, fields=%s", timestamp, fields) t.Fatalf("unexpected call to processLogMessage with timestamp=%d, fields=%s", timestamp, fields)
return nil
} }
r := bytes.NewBufferString(data) r := bytes.NewBufferString(data)
@ -44,7 +43,7 @@ func TestReadBulkRequestSuccess(t *testing.T) {
var timestamps []int64 var timestamps []int64
var result string var result string
processLogMessage := func(timestamp int64, fields []logstorage.Field) error { processLogMessage := func(timestamp int64, fields []logstorage.Field) {
timestamps = append(timestamps, timestamp) timestamps = append(timestamps, timestamp)
a := make([]string, len(fields)) a := make([]string, len(fields))
@ -53,7 +52,6 @@ func TestReadBulkRequestSuccess(t *testing.T) {
} }
s := "{" + strings.Join(a, ",") + "}\n" s := "{" + strings.Join(a, ",") + "}\n"
result += s result += s
return nil
} }
// Read the request without compression // Read the request without compression

View file

@ -33,7 +33,7 @@ func benchmarkReadBulkRequest(b *testing.B, isGzip bool) {
timeField := "@timestamp" timeField := "@timestamp"
msgField := "message" msgField := "message"
processLogMessage := func(timestmap int64, fields []logstorage.Field) error { return nil } processLogMessage := func(timestmap int64, fields []logstorage.Field) {}
b.ReportAllocs() b.ReportAllocs()
b.SetBytes(int64(len(data))) b.SetBytes(int64(len(data)))

View file

@ -72,13 +72,13 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) {
} }
// GetProcessLogMessageFunc returns a function, which adds parsed log messages to lr. // GetProcessLogMessageFunc returns a function, which adds parsed log messages to lr.
func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) error { func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) {
return func(timestamp int64, fields []logstorage.Field) error { return func(timestamp int64, fields []logstorage.Field) {
if len(fields) > *MaxFieldsPerLine { if len(fields) > *MaxFieldsPerLine {
rf := logstorage.RowFormatter(fields) rf := logstorage.RowFormatter(fields)
logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, rf) logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, rf)
rowsDroppedTotalTooManyFields.Inc() rowsDroppedTotalTooManyFields.Inc()
return nil return
} }
lr.MustAdd(cp.TenantID, timestamp, fields) lr.MustAdd(cp.TenantID, timestamp, fields)
@ -87,14 +87,12 @@ func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(ti
lr.ResetKeepSettings() lr.ResetKeepSettings()
logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", cp.DebugRemoteAddr, cp.DebugRequestURI, s) logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", cp.DebugRemoteAddr, cp.DebugRequestURI, s)
rowsDroppedTotalDebug.Inc() rowsDroppedTotalDebug.Inc()
return nil return
} }
if lr.NeedFlush() { if lr.NeedFlush() {
err := vlstorage.AddRows(lr) vlstorage.MustAddRows(lr)
lr.ResetKeepSettings() lr.ResetKeepSettings()
return err
} }
return nil
} }
} }

View file

@ -36,6 +36,10 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
httpserver.Errorf(w, r, "%s", err) httpserver.Errorf(w, r, "%s", err)
return true return true
} }
if err := vlstorage.CanWriteData(); err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
processLogMessage := cp.GetProcessLogMessageFunc(lr) processLogMessage := cp.GetProcessLogMessageFunc(lr)
@ -75,12 +79,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
rowsIngestedTotal.Inc() rowsIngestedTotal.Inc()
} }
err = vlstorage.AddRows(lr) vlstorage.MustAddRows(lr)
logstorage.PutLogRows(lr) logstorage.PutLogRows(lr)
if err != nil {
httpserver.Errorf(w, r, "cannot insert rows: %s", err)
return true
}
// update jsonlineRequestDuration only for successfully parsed requests. // update jsonlineRequestDuration only for successfully parsed requests.
// There is no need in updating jsonlineRequestDuration for request errors, // There is no need in updating jsonlineRequestDuration for request errors,
@ -90,7 +90,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
return true return true
} }
func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field) error) (bool, error) { func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) {
var line []byte var line []byte
for len(line) == 0 { for len(line) == 0 {
if !sc.Scan() { if !sc.Scan() {
@ -117,11 +117,8 @@ func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage f
ts = time.Now().UnixNano() ts = time.Now().UnixNano()
} }
p.RenameField(msgField, "_msg") p.RenameField(msgField, "_msg")
err = processLogMessage(ts, p.Fields) processLogMessage(ts, p.Fields)
logjson.PutParser(p) logjson.PutParser(p)
if err != nil {
return false, err
}
return true, nil return true, nil
} }

View file

@ -16,7 +16,7 @@ func TestReadBulkRequestSuccess(t *testing.T) {
var timestamps []int64 var timestamps []int64
var result string var result string
processLogMessage := func(timestamp int64, fields []logstorage.Field) error { processLogMessage := func(timestamp int64, fields []logstorage.Field) {
timestamps = append(timestamps, timestamp) timestamps = append(timestamps, timestamp)
a := make([]string, len(fields)) a := make([]string, len(fields))
@ -25,8 +25,6 @@ func TestReadBulkRequestSuccess(t *testing.T) {
} }
s := "{" + strings.Join(a, ",") + "}\n" s := "{" + strings.Join(a, ",") + "}\n"
result += s result += s
return nil
} }
// Read the request without compression // Read the request without compression

View file

@ -47,21 +47,20 @@ func handleJSON(r *http.Request, w http.ResponseWriter) bool {
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err) httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
return true return true
} }
if err := vlstorage.CanWriteData(); err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
processLogMessage := cp.GetProcessLogMessageFunc(lr) processLogMessage := cp.GetProcessLogMessageFunc(lr)
n, err := parseJSONRequest(data, processLogMessage) n, err := parseJSONRequest(data, processLogMessage)
vlstorage.MustAddRows(lr)
logstorage.PutLogRows(lr)
if err != nil { if err != nil {
logstorage.PutLogRows(lr) httpserver.Errorf(w, r, "cannot parse Loki json request: %s", err)
httpserver.Errorf(w, r, "cannot parse Loki request: %s", err)
return true return true
} }
err = vlstorage.AddRows(lr)
logstorage.PutLogRows(lr)
if err != nil {
httpserver.Errorf(w, r, "cannot insert rows: %s", err)
return true
}
rowsIngestedJSONTotal.Add(n) rowsIngestedJSONTotal.Add(n)
// update lokiRequestJSONDuration only for successfully parsed requests // update lokiRequestJSONDuration only for successfully parsed requests
@ -78,7 +77,7 @@ var (
lokiRequestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`) lokiRequestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`)
) )
func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field) error) (int, error) { func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
p := parserPool.Get() p := parserPool.Get()
defer parserPool.Put(p) defer parserPool.Put(p)
v, err := p.ParseBytes(data) v, err := p.ParseBytes(data)
@ -171,11 +170,7 @@ func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, field
Name: "_msg", Name: "_msg",
Value: bytesutil.ToUnsafeString(msg), Value: bytesutil.ToUnsafeString(msg),
}) })
err = processLogMessage(ts, fields) processLogMessage(ts, fields)
if err != nil {
return rowsIngested, err
}
} }
rowsIngested += len(lines) rowsIngested += len(lines)
} }

View file

@ -11,9 +11,8 @@ import (
func TestParseJSONRequestFailure(t *testing.T) { func TestParseJSONRequestFailure(t *testing.T) {
f := func(s string) { f := func(s string) {
t.Helper() t.Helper()
n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) error { n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) {
t.Fatalf("unexpected call to parseJSONRequest callback!") t.Fatalf("unexpected call to parseJSONRequest callback!")
return nil
}) })
if err == nil { if err == nil {
t.Fatalf("expecting non-nil error") t.Fatalf("expecting non-nil error")
@ -61,14 +60,13 @@ func TestParseJSONRequestSuccess(t *testing.T) {
f := func(s string, resultExpected string) { f := func(s string, resultExpected string) {
t.Helper() t.Helper()
var lines []string var lines []string
n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) error { n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) {
var a []string var a []string
for _, f := range fields { for _, f := range fields {
a = append(a, f.String()) a = append(a, f.String())
} }
line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " ")) line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " "))
lines = append(lines, line) lines = append(lines, line)
return nil
}) })
if err != nil { if err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)

View file

@ -27,7 +27,7 @@ func benchmarkParseJSONRequest(b *testing.B, streams, rows, labels int) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
data := getJSONBody(streams, rows, labels) data := getJSONBody(streams, rows, labels)
for pb.Next() { for pb.Next() {
_, err := parseJSONRequest(data, func(timestamp int64, fields []logstorage.Field) error { return nil }) _, err := parseJSONRequest(data, func(timestamp int64, fields []logstorage.Field) {})
if err != nil { if err != nil {
panic(fmt.Errorf("unexpected error: %s", err)) panic(fmt.Errorf("unexpected error: %s", err))
} }

View file

@ -39,19 +39,17 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) bool {
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err) httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
return true return true
} }
if err := vlstorage.CanWriteData(); err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
processLogMessage := cp.GetProcessLogMessageFunc(lr) processLogMessage := cp.GetProcessLogMessageFunc(lr)
n, err := parseProtobufRequest(data, processLogMessage) n, err := parseProtobufRequest(data, processLogMessage)
if err != nil { vlstorage.MustAddRows(lr)
logstorage.PutLogRows(lr)
httpserver.Errorf(w, r, "cannot parse Loki request: %s", err)
return true
}
err = vlstorage.AddRows(lr)
logstorage.PutLogRows(lr) logstorage.PutLogRows(lr)
if err != nil { if err != nil {
httpserver.Errorf(w, r, "cannot insert rows: %s", err) httpserver.Errorf(w, r, "cannot parse Loki protobuf request: %s", err)
return true return true
} }
@ -71,7 +69,7 @@ var (
lokiRequestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`) lokiRequestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`)
) )
func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field) error) (int, error) { func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
bb := bytesBufPool.Get() bb := bytesBufPool.Get()
defer bytesBufPool.Put(bb) defer bytesBufPool.Put(bb)
@ -114,10 +112,7 @@ func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, f
if ts == 0 { if ts == 0 {
ts = currentTimestamp ts = currentTimestamp
} }
err = processLogMessage(ts, fields) processLogMessage(ts, fields)
if err != nil {
return rowsIngested, err
}
} }
rowsIngested += len(stream.Entries) rowsIngested += len(stream.Entries)
} }

View file

@ -14,7 +14,7 @@ func TestParseProtobufRequestSuccess(t *testing.T) {
f := func(s string, resultExpected string) { f := func(s string, resultExpected string) {
t.Helper() t.Helper()
var pr PushRequest var pr PushRequest
n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) error { n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) {
msg := "" msg := ""
for _, f := range fields { for _, f := range fields {
if f.Name == "_msg" { if f.Name == "_msg" {
@ -39,7 +39,6 @@ func TestParseProtobufRequestSuccess(t *testing.T) {
}, },
}, },
}) })
return nil
}) })
if err != nil { if err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
@ -55,14 +54,13 @@ func TestParseProtobufRequestSuccess(t *testing.T) {
encodedData := snappy.Encode(nil, data) encodedData := snappy.Encode(nil, data)
var lines []string var lines []string
n, err = parseProtobufRequest(encodedData, func(timestamp int64, fields []logstorage.Field) error { n, err = parseProtobufRequest(encodedData, func(timestamp int64, fields []logstorage.Field) {
var a []string var a []string
for _, f := range fields { for _, f := range fields {
a = append(a, f.String()) a = append(a, f.String())
} }
line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " ")) line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " "))
lines = append(lines, line) lines = append(lines, line)
return nil
}) })
if err != nil { if err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)

View file

@ -29,7 +29,7 @@ func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
body := getProtobufBody(streams, rows, labels) body := getProtobufBody(streams, rows, labels)
for pb.Next() { for pb.Next() {
_, err := parseProtobufRequest(body, func(timestamp int64, fields []logstorage.Field) error { return nil }) _, err := parseProtobufRequest(body, func(timestamp int64, fields []logstorage.Field) {})
if err != nil { if err != nil {
panic(fmt.Errorf("unexpected error: %s", err)) panic(fmt.Errorf("unexpected error: %s", err))
} }

View file

@ -3,6 +3,7 @@ package vlstorage
import ( import (
"flag" "flag"
"fmt" "fmt"
"net/http"
"sync" "sync"
"time" "time"
@ -10,6 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
) )
@ -30,7 +32,8 @@ var (
"see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields ; see also -logIngestedRows") "see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields ; see also -logIngestedRows")
logIngestedRows = flag.Bool("logIngestedRows", false, "Whether to log all the ingested log entries; this can be useful for debugging of data ingestion; "+ logIngestedRows = flag.Bool("logIngestedRows", false, "Whether to log all the ingested log entries; this can be useful for debugging of data ingestion; "+
"see https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/ ; see also -logNewStreams") "see https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/ ; see also -logNewStreams")
minFreeDiskSpaceBytes = flagutil.NewBytes("storage.minFreeDiskSpaceBytes", 10e6, "The minimum free disk space at -storageDataPath after which the storage stops accepting new data") minFreeDiskSpaceBytes = flagutil.NewBytes("storage.minFreeDiskSpaceBytes", 10e6, "The minimum free disk space at -storageDataPath after which "+
"the storage stops accepting new data")
) )
// Init initializes vlstorage. // Init initializes vlstorage.
@ -77,9 +80,23 @@ func Stop() {
var strg *logstorage.Storage var strg *logstorage.Storage
var storageMetrics *metrics.Set var storageMetrics *metrics.Set
// AddRows adds lr to vlstorage // CanWriteData returns non-nil error if it cannot write data to vlstorage.
func AddRows(lr *logstorage.LogRows) error { func CanWriteData() error {
return strg.AddRows(lr) if strg.IsReadOnly() {
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot add rows into storage in read-only mode; the storage can be in read-only mode "+
"because of lack of free disk space at -storageDataPath=%s", *storageDataPath),
StatusCode: http.StatusTooManyRequests,
}
}
return nil
}
// MustAddRows adds lr to vlstorage
//
// It is advised to call CanWriteData() before calling MustAddRows()
func MustAddRows(lr *logstorage.LogRows) {
strg.MustAddRows(lr)
} }
// RunQuery runs the given q and calls processBlock for the returned data blocks // RunQuery runs the given q and calls processBlock for the returned data blocks
@ -114,7 +131,6 @@ func initStorageMetrics(strg *logstorage.Storage) *metrics.Set {
if m().IsReadOnly { if m().IsReadOnly {
return 1 return 1
} }
return 0 return 0
}) })

View file

@ -2,8 +2,8 @@ package logstorage
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"math"
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
@ -70,18 +70,15 @@ type datadb struct {
// stopCh is used for notifying background workers to stop // stopCh is used for notifying background workers to stop
stopCh chan struct{} stopCh chan struct{}
// inmemoryPartsFlushersCount is the number of currently running in-memory parts flushers // oldInmemoryPartsFlushersCount is the number of currently running flushers for old in-memory parts
// //
// This variable must be accessed under partsLock. // This variable must be accessed under partsLock.
inmemoryPartsFlushersCount int oldInmemoryPartsFlushersCount int
// mergeWorkersCount is the number of currently running merge workers // mergeWorkersCount is the number of currently running merge workers
// //
// This variable must be accessed under partsLock. // This variable must be accessed under partsLock.
mergeWorkersCount int mergeWorkersCount int
// isReadOnly indicates whether the storage is in read-only mode.
isReadOnly *uint32
} }
// partWrapper is a wrapper for opened part. // partWrapper is a wrapper for opened part.
@ -141,7 +138,7 @@ func mustCreateDatadb(path string) {
} }
// mustOpenDatadb opens datadb at the given path with the given flushInterval for in-memory data. // mustOpenDatadb opens datadb at the given path with the given flushInterval for in-memory data.
func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration, isReadOnly *uint32) *datadb { func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *datadb {
// Remove temporary directories, which may be left after unclean shutdown. // Remove temporary directories, which may be left after unclean shutdown.
fs.MustRemoveTemporaryDirs(path) fs.MustRemoveTemporaryDirs(path)
@ -173,7 +170,6 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration, isR
path: path, path: path,
fileParts: pws, fileParts: pws,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
isReadOnly: isReadOnly,
} }
// Start merge workers in the hope they'll merge the remaining parts // Start merge workers in the hope they'll merge the remaining parts
@ -187,51 +183,65 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration, isR
return ddb return ddb
} }
// startInmemoryPartsFlusherLocked starts flusher for in-memory parts to disk. // startOldInmemoryPartsFlusherLocked starts flusher for old in-memory parts to disk.
// //
// This function must be called under partsLock. // This function must be called under partsLock.
func (ddb *datadb) startInmemoryPartsFlusherLocked() { func (ddb *datadb) startOldInmemoryPartsFlusherLocked() {
maxWorkers := getMergeWorkersCount() maxWorkers := getMergeWorkersCount()
if ddb.inmemoryPartsFlushersCount >= maxWorkers { if ddb.oldInmemoryPartsFlushersCount >= maxWorkers {
return return
} }
ddb.inmemoryPartsFlushersCount++ ddb.oldInmemoryPartsFlushersCount++
ddb.wg.Add(1) ddb.wg.Add(1)
go func() { go func() {
ddb.flushInmemoryParts() ddb.flushOldInmemoryParts()
ddb.wg.Done() ddb.wg.Done()
}() }()
} }
func (ddb *datadb) flushInmemoryParts() { func (ddb *datadb) flushOldInmemoryParts() {
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second)
defer ticker.Stop() defer ticker.Stop()
for { var parts, partsToMerge []*partWrapper
for !needStop(ddb.stopCh) {
ddb.partsLock.Lock() ddb.partsLock.Lock()
pws := make([]*partWrapper, 0, len(ddb.inmemoryParts)) parts = appendNotInMergePartsLocked(parts[:0], ddb.inmemoryParts)
pws = appendNotInMergePartsLocked(pws, ddb.inmemoryParts)
currentTime := time.Now() currentTime := time.Now()
partsToFlush := pws[:0] partsToFlush := parts[:0]
for _, pw := range pws { for _, pw := range parts {
if pw.flushDeadline.Before(currentTime) { if pw.flushDeadline.Before(currentTime) {
partsToFlush = append(partsToFlush, pw) partsToFlush = append(partsToFlush, pw)
} }
} }
setInMergeLocked(partsToFlush) // Do not take into account available disk space when flushing in-memory parts to disk,
if len(pws) == 0 { // since otherwise the outdated in-memory parts may remain in-memory, which, in turn,
ddb.inmemoryPartsFlushersCount-- // may result in increased memory usage plus possible loss of historical data.
// It is better to crash on out of disk error in this case.
partsToMerge = appendPartsToMerge(partsToMerge[:0], partsToFlush, math.MaxUint64)
if len(partsToMerge) == 0 {
partsToMerge = append(partsToMerge[:0], partsToFlush...)
}
setInMergeLocked(partsToMerge)
needStop := false
if len(ddb.inmemoryParts) == 0 {
// There are no in-memory parts, so stop the flusher.
needStop = true
ddb.oldInmemoryPartsFlushersCount--
} }
ddb.partsLock.Unlock() ddb.partsLock.Unlock()
if len(pws) == 0 { if needStop {
// There are no in-memory parts, so stop the flusher.
return return
} }
err := ddb.mergePartsFinal(partsToFlush)
if err != nil { ddb.mustMergeParts(partsToMerge, true)
logger.Panicf("FATAL: cannot flush inmemory parts to disk: %s", err) if len(partsToMerge) < len(partsToFlush) {
// Continue merging remaining old in-memory parts from partsToFlush list.
continue
} }
// There are no old in-memory parts to flush. Sleep for a while until these parts appear.
select { select {
case <-ddb.stopCh: case <-ddb.stopCh:
return return
@ -244,9 +254,6 @@ func (ddb *datadb) flushInmemoryParts() {
// //
// This function must be called under locked partsLock. // This function must be called under locked partsLock.
func (ddb *datadb) startMergeWorkerLocked() { func (ddb *datadb) startMergeWorkerLocked() {
if ddb.IsReadOnly() {
return
}
maxWorkers := getMergeWorkersCount() maxWorkers := getMergeWorkersCount()
if ddb.mergeWorkersCount >= maxWorkers { if ddb.mergeWorkersCount >= maxWorkers {
return return
@ -255,11 +262,8 @@ func (ddb *datadb) startMergeWorkerLocked() {
ddb.wg.Add(1) ddb.wg.Add(1)
go func() { go func() {
globalMergeLimitCh <- struct{}{} globalMergeLimitCh <- struct{}{}
err := ddb.mergeExistingParts() ddb.mustMergeExistingParts()
<-globalMergeLimitCh <-globalMergeLimitCh
if err != nil && !errors.Is(err, errReadOnly) {
logger.Panicf("FATAL: background merge failed: %s", err)
}
ddb.wg.Done() ddb.wg.Done()
}() }()
} }
@ -279,9 +283,9 @@ func getMergeWorkersCount() int {
return n return n
} }
func (ddb *datadb) mergeExistingParts() error { func (ddb *datadb) mustMergeExistingParts() {
for !needStop(ddb.stopCh) { for !needStop(ddb.stopCh) {
maxOutBytes := ddb.availableDiskSpace() maxOutBytes := availableDiskSpace(ddb.path)
ddb.partsLock.Lock() ddb.partsLock.Lock()
parts := make([]*partWrapper, 0, len(ddb.inmemoryParts)+len(ddb.fileParts)) parts := make([]*partWrapper, 0, len(ddb.inmemoryParts)+len(ddb.fileParts))
@ -296,25 +300,11 @@ func (ddb *datadb) mergeExistingParts() error {
if len(pws) == 0 { if len(pws) == 0 {
// Nothing to merge at the moment. // Nothing to merge at the moment.
return nil return
} }
partsSize := getCompressedSize(pws) ddb.mustMergeParts(pws, false)
if !ddb.reserveDiskSpace(partsSize) {
// There is no free disk space for the merge,
// because concurrent merge workers already reserved the disk space.
// Try again with smaller maxOutBytes.
ddb.releasePartsToMerge(pws)
continue
}
err := ddb.mergeParts(pws, false)
ddb.releaseDiskSpace(partsSize)
if err != nil {
return err
}
} }
return nil
} }
// appendNotInMergePartsLocked appends src parts with isInMerge=false to dst and returns the result. // appendNotInMergePartsLocked appends src parts with isInMerge=false to dst and returns the result.
@ -349,29 +339,42 @@ func assertIsInMerge(pws []*partWrapper) {
} }
} }
var errReadOnly = errors.New("the storage is in read-only mode") // mustMergeParts merges pws to a single resulting part.
// mergeParts merges pws to a single resulting part.
// //
// if isFinal is set, then the resulting part will be saved to disk. // if isFinal is set, then the resulting part is guaranteed to be saved to disk.
// The pws may remain unmerged after returning from the function if there is no enough disk space.
// //
// All the parts inside pws must have isInMerge field set to true. // All the parts inside pws must have isInMerge field set to true.
// The isInMerge field inside pws parts is set to false before returning from the function. // The isInMerge field inside pws parts is set to false before returning from the function.
func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error { func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
if len(pws) == 0 { if len(pws) == 0 {
// Nothing to merge. // Nothing to merge.
return nil return
} }
if ddb.IsReadOnly() {
return errReadOnly
}
assertIsInMerge(pws) assertIsInMerge(pws)
defer ddb.releasePartsToMerge(pws) defer ddb.releasePartsToMerge(pws)
startTime := time.Now() startTime := time.Now()
dstPartType := ddb.getDstPartType(pws, isFinal) dstPartType := ddb.getDstPartType(pws, isFinal)
if dstPartType == partFile {
// Make sure there is enough disk space for performing the merge
partsSize := getCompressedSize(pws)
needReleaseDiskSpace := tryReserveDiskSpace(ddb.path, partsSize)
if needReleaseDiskSpace {
defer releaseDiskSpace(partsSize)
} else {
if !isFinal {
// There is no enough disk space for performing the non-final merge.
return
}
// Try performing final merge even if there is no enough disk space
// in order to persist in-memory data to disk.
// It is better to crash on out of memory error in this case.
}
}
if dstPartType == partInmemory { if dstPartType == partInmemory {
atomic.AddUint64(&ddb.inmemoryMergesTotal, 1) atomic.AddUint64(&ddb.inmemoryMergesTotal, 1)
atomic.AddUint64(&ddb.inmemoryActiveMerges, 1) atomic.AddUint64(&ddb.inmemoryActiveMerges, 1)
@ -392,7 +395,7 @@ func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error {
mp.MustStoreToDisk(dstPartPath) mp.MustStoreToDisk(dstPartPath)
pwNew := ddb.openCreatedPart(&mp.ph, pws, nil, dstPartPath) pwNew := ddb.openCreatedPart(&mp.ph, pws, nil, dstPartPath)
ddb.swapSrcWithDstParts(pws, pwNew, dstPartType) ddb.swapSrcWithDstParts(pws, pwNew, dstPartType)
return nil return
} }
// Prepare blockStreamReaders for source parts. // Prepare blockStreamReaders for source parts.
@ -443,7 +446,7 @@ func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error {
if dstPartType == partFile { if dstPartType == partFile {
fs.MustRemoveAll(dstPartPath) fs.MustRemoveAll(dstPartPath)
} }
return nil return
} }
// Atomically swap the source parts with the newly created part. // Atomically swap the source parts with the newly created part.
@ -463,7 +466,7 @@ func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error {
d := time.Since(startTime) d := time.Since(startTime)
if d <= 30*time.Second { if d <= 30*time.Second {
return nil return
} }
// Log stats for long merges. // Log stats for long merges.
@ -471,7 +474,6 @@ func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error {
rowsPerSec := int(float64(srcRowsCount) / durationSecs) rowsPerSec := int(float64(srcRowsCount) / durationSecs)
logger.Infof("merged (%d parts, %d rows, %d blocks, %d bytes) into (1 part, %d rows, %d blocks, %d bytes) in %.3f seconds at %d rows/sec to %q", logger.Infof("merged (%d parts, %d rows, %d blocks, %d bytes) into (1 part, %d rows, %d blocks, %d bytes) in %.3f seconds at %d rows/sec to %q",
len(pws), srcRowsCount, srcBlocksCount, srcSize, dstRowsCount, dstBlocksCount, dstSize, durationSecs, rowsPerSec, dstPartPath) len(pws), srcRowsCount, srcBlocksCount, srcSize, dstRowsCount, dstBlocksCount, dstSize, durationSecs, rowsPerSec, dstPartPath)
return nil
} }
func (ddb *datadb) nextMergeIdx() uint64 { func (ddb *datadb) nextMergeIdx() uint64 {
@ -546,7 +548,7 @@ func (ddb *datadb) mustAddRows(lr *LogRows) {
ddb.partsLock.Lock() ddb.partsLock.Lock()
ddb.inmemoryParts = append(ddb.inmemoryParts, pw) ddb.inmemoryParts = append(ddb.inmemoryParts, pw)
ddb.startInmemoryPartsFlusherLocked() ddb.startOldInmemoryPartsFlusherLocked()
if len(ddb.inmemoryParts) > defaultPartsToMerge { if len(ddb.inmemoryParts) > defaultPartsToMerge {
ddb.startMergeWorkerLocked() ddb.startMergeWorkerLocked()
} }
@ -559,9 +561,6 @@ func (ddb *datadb) mustAddRows(lr *LogRows) {
} }
func (ddb *datadb) needAssistedMergeForInmemoryPartsLocked() bool { func (ddb *datadb) needAssistedMergeForInmemoryPartsLocked() bool {
if ddb.IsReadOnly() {
return false
}
if len(ddb.inmemoryParts) < maxInmemoryPartsPerPartition { if len(ddb.inmemoryParts) < maxInmemoryPartsPerPartition {
return false return false
} }
@ -578,15 +577,16 @@ func (ddb *datadb) assistedMergeForInmemoryParts() {
ddb.partsLock.Lock() ddb.partsLock.Lock()
parts := make([]*partWrapper, 0, len(ddb.inmemoryParts)) parts := make([]*partWrapper, 0, len(ddb.inmemoryParts))
parts = appendNotInMergePartsLocked(parts, ddb.inmemoryParts) parts = appendNotInMergePartsLocked(parts, ddb.inmemoryParts)
pws := appendPartsToMerge(nil, parts, (1<<64)-1) // Do not take into account available disk space when merging in-memory parts,
// since otherwise the outdated in-memory parts may remain in-memory, which, in turn,
// may result in increased memory usage plus possible loss of historical data.
// It is better to crash on out of disk error in this case.
pws := make([]*partWrapper, 0, len(parts))
pws = appendPartsToMerge(pws[:0], parts, math.MaxUint64)
setInMergeLocked(pws) setInMergeLocked(pws)
ddb.partsLock.Unlock() ddb.partsLock.Unlock()
err := ddb.mergeParts(pws, false) ddb.mustMergeParts(pws, false)
if err == nil || errors.Is(err, errReadOnly) {
return
}
logger.Panicf("FATAL: cannot perform assisted merge for in-memory parts: %s", err)
} }
// DatadbStats contains various stats for datadb. // DatadbStats contains various stats for datadb.
@ -675,12 +675,18 @@ func (ddb *datadb) debugFlush() {
// Nothing to do, since all the ingested data is available for search via ddb.inmemoryParts. // Nothing to do, since all the ingested data is available for search via ddb.inmemoryParts.
} }
func (ddb *datadb) mergePartsFinal(pws []*partWrapper) error { func (ddb *datadb) mustFlushInmemoryPartsToDisk() {
assertIsInMerge(pws) ddb.partsLock.Lock()
pws := append([]*partWrapper{}, ddb.inmemoryParts...)
setInMergeLocked(pws)
ddb.partsLock.Unlock()
var pwsChunk []*partWrapper var pwsChunk []*partWrapper
for len(pws) > 0 { for len(pws) > 0 {
pwsChunk = appendPartsToMerge(pwsChunk[:0], pws, (1<<64)-1) // Do not take into account available disk space when performing the final flush of in-memory parts to disk,
// since otherwise these parts will be lost.
// It is better to crash on out of disk error in this case.
pwsChunk = appendPartsToMerge(pwsChunk[:0], pws, math.MaxUint64)
if len(pwsChunk) == 0 { if len(pwsChunk) == 0 {
pwsChunk = append(pwsChunk[:0], pws...) pwsChunk = append(pwsChunk[:0], pws...)
} }
@ -691,13 +697,8 @@ func (ddb *datadb) mergePartsFinal(pws []*partWrapper) error {
logger.Panicf("BUG: unexpected number of parts removed; got %d; want %d", removedParts, len(pwsChunk)) logger.Panicf("BUG: unexpected number of parts removed; got %d; want %d", removedParts, len(pwsChunk))
} }
err := ddb.mergeParts(pwsChunk, true) ddb.mustMergeParts(pwsChunk, true)
if err != nil {
ddb.releasePartsToMerge(pws)
return err
}
} }
return nil
} }
func partsToMap(pws []*partWrapper) map[*partWrapper]struct{} { func partsToMap(pws []*partWrapper) map[*partWrapper]struct{} {
@ -725,7 +726,7 @@ func (ddb *datadb) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, d
switch dstPartType { switch dstPartType {
case partInmemory: case partInmemory:
ddb.inmemoryParts = append(ddb.inmemoryParts, pwNew) ddb.inmemoryParts = append(ddb.inmemoryParts, pwNew)
ddb.startInmemoryPartsFlusherLocked() ddb.startOldInmemoryPartsFlusherLocked()
case partFile: case partFile:
ddb.fileParts = append(ddb.fileParts, pwNew) ddb.fileParts = append(ddb.fileParts, pwNew)
default: default:
@ -840,8 +841,8 @@ func (ddb *datadb) releasePartsToMerge(pws []*partWrapper) {
ddb.partsLock.Unlock() ddb.partsLock.Unlock()
} }
func (ddb *datadb) availableDiskSpace() uint64 { func availableDiskSpace(path string) uint64 {
available := fs.MustGetFreeSpace(ddb.path) available := fs.MustGetFreeSpace(path)
reserved := atomic.LoadUint64(&reservedDiskSpace) reserved := atomic.LoadUint64(&reservedDiskSpace)
if available < reserved { if available < reserved {
return 0 return 0
@ -849,22 +850,22 @@ func (ddb *datadb) availableDiskSpace() uint64 {
return available - reserved return available - reserved
} }
func (ddb *datadb) reserveDiskSpace(n uint64) bool { func tryReserveDiskSpace(path string, n uint64) bool {
available := fs.MustGetFreeSpace(ddb.path) available := fs.MustGetFreeSpace(path)
reserved := atomic.AddUint64(&reservedDiskSpace, n) reserved := reserveDiskSpace(n)
if available > reserved { if available > reserved {
return true return true
} }
ddb.releaseDiskSpace(n) releaseDiskSpace(n)
return false return false
} }
func (ddb *datadb) releaseDiskSpace(n uint64) { func reserveDiskSpace(n uint64) uint64 {
atomic.AddUint64(&reservedDiskSpace, -n) return atomic.AddUint64(&reservedDiskSpace, n)
} }
func (ddb *datadb) IsReadOnly() bool { func releaseDiskSpace(n uint64) {
return atomic.LoadUint32(ddb.isReadOnly) == 1 atomic.AddUint64(&reservedDiskSpace, ^(n - 1))
} }
// reservedDiskSpace tracks global reserved disk space for currently executed // reservedDiskSpace tracks global reserved disk space for currently executed
@ -889,22 +890,13 @@ func mustCloseDatadb(ddb *datadb) {
ddb.wg.Wait() ddb.wg.Wait()
// flush in-memory data to disk // flush in-memory data to disk
pws := append([]*partWrapper{}, ddb.inmemoryParts...) ddb.mustFlushInmemoryPartsToDisk()
setInMergeLocked(pws) if len(ddb.inmemoryParts) > 0 {
err := ddb.mergePartsFinal(pws) logger.Panicf("BUG: the number of in-memory parts must be zero after flushing them to disk; got %d", len(ddb.inmemoryParts))
if err != nil {
logger.Fatalf("FATAL: cannot merge inmemory parts: %s", err)
}
// There is no need in using ddb.partsLock here, since nobody should acces ddb now.
for _, pw := range ddb.inmemoryParts {
pw.decRef()
if pw.refCount != 0 {
logger.Panicf("BUG: there are %d references to inmemoryPart", pw.refCount)
}
} }
ddb.inmemoryParts = nil ddb.inmemoryParts = nil
// close file parts
for _, pw := range ddb.fileParts { for _, pw := range ddb.fileParts {
pw.decRef() pw.decRef()
if pw.refCount != 0 { if pw.refCount != 0 {

View file

@ -2976,8 +2976,8 @@ func toUint64Clamp(f float64) uint64 {
if f < 0 { if f < 0 {
return 0 return 0
} }
if f > (1<<64)-1 { if f > math.MaxUint64 {
return (1 << 64) - 1 return math.MaxUint64
} }
return uint64(f) return uint64(f)
} }

View file

@ -9277,7 +9277,7 @@ func generateRowsFromColumns(s *Storage, tenantID TenantID, columns []column) {
timestamp := int64(i) * 1e9 timestamp := int64(i) * 1e9
lr.MustAdd(tenantID, timestamp, fields) lr.MustAdd(tenantID, timestamp, fields)
} }
_ = s.AddRows(lr) s.MustAddRows(lr)
PutLogRows(lr) PutLogRows(lr)
} }
@ -9291,6 +9291,6 @@ func generateRowsFromTimestamps(s *Storage, tenantID TenantID, timestamps []int6
}) })
lr.MustAdd(tenantID, timestamp, fields) lr.MustAdd(tenantID, timestamp, fields)
} }
_ = s.AddRows(lr) s.MustAddRows(lr)
PutLogRows(lr) PutLogRows(lr)
} }

View file

@ -7,7 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
) )
// LogRows holds a set of rows needed for Storage.AddRows // LogRows holds a set of rows needed for Storage.MustAddRows
// //
// LogRows must be obtained via GetLogRows() // LogRows must be obtained via GetLogRows()
type LogRows struct { type LogRows struct {

View file

@ -77,7 +77,7 @@ func mustOpenPartition(s *Storage, path string) *partition {
// Open datadb // Open datadb
datadbPath := filepath.Join(path, datadbDirname) datadbPath := filepath.Join(path, datadbDirname)
pt.ddb = mustOpenDatadb(pt, datadbPath, s.flushInterval, &s.isReadOnly) pt.ddb = mustOpenDatadb(pt, datadbPath, s.flushInterval)
return pt return pt
} }

View file

@ -25,10 +25,11 @@ type StorageStats struct {
// PartitionsCount is the number of partitions in the storage // PartitionsCount is the number of partitions in the storage
PartitionsCount uint64 PartitionsCount uint64
PartitionStats
// IsReadOnly indicates whether the storage is read-only. // IsReadOnly indicates whether the storage is read-only.
IsReadOnly bool IsReadOnly bool
// PartitionStats contains partition stats.
PartitionStats
} }
// Reset resets s. // Reset resets s.
@ -51,6 +52,9 @@ type StorageConfig struct {
// Log entries with timestamps bigger than now+FutureRetention are ignored. // Log entries with timestamps bigger than now+FutureRetention are ignored.
FutureRetention time.Duration FutureRetention time.Duration
// MinFreeDiskSpaceBytes is the minimum free disk space at storage path after which the storage stops accepting new data.
MinFreeDiskSpaceBytes int64
// LogNewStreams indicates whether to log newly created log streams. // LogNewStreams indicates whether to log newly created log streams.
// //
// This can be useful for debugging of high cardinality issues. // This can be useful for debugging of high cardinality issues.
@ -61,9 +65,6 @@ type StorageConfig struct {
// //
// This can be useful for debugging of data ingestion. // This can be useful for debugging of data ingestion.
LogIngestedRows bool LogIngestedRows bool
// MinFreeDiskSpaceBytes is the minimum free disk space at -storageDataPath after which the storage stops accepting new data
MinFreeDiskSpaceBytes int64
} }
// Storage is the storage for log entries. // Storage is the storage for log entries.
@ -85,6 +86,9 @@ type Storage struct {
// futureRetention is the maximum allowed interval to write data into the future // futureRetention is the maximum allowed interval to write data into the future
futureRetention time.Duration futureRetention time.Duration
// minFreeDiskSpaceBytes is the minimum free disk space at path after which the storage stops accepting new data
minFreeDiskSpaceBytes uint64
// logNewStreams instructs to log new streams if it is set to true // logNewStreams instructs to log new streams if it is set to true
logNewStreams bool logNewStreams bool
@ -132,10 +136,6 @@ type Storage struct {
// //
// It reduces the load on persistent storage during querying by _stream:{...} filter. // It reduces the load on persistent storage during querying by _stream:{...} filter.
streamFilterCache *workingsetcache.Cache streamFilterCache *workingsetcache.Cache
isReadOnly uint32
freeDiskSpaceWatcherWG sync.WaitGroup
} }
type partitionWrapper struct { type partitionWrapper struct {
@ -225,6 +225,11 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
futureRetention = 24 * time.Hour futureRetention = 24 * time.Hour
} }
var minFreeDiskSpaceBytes uint64
if cfg.MinFreeDiskSpaceBytes >= 0 {
minFreeDiskSpaceBytes = uint64(cfg.MinFreeDiskSpaceBytes)
}
if !fs.IsPathExist(path) { if !fs.IsPathExist(path) {
mustCreateStorage(path) mustCreateStorage(path)
} }
@ -241,14 +246,15 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
streamFilterCache := workingsetcache.New(mem / 10) streamFilterCache := workingsetcache.New(mem / 10)
s := &Storage{ s := &Storage{
path: path, path: path,
retention: retention, retention: retention,
flushInterval: flushInterval, flushInterval: flushInterval,
futureRetention: futureRetention, futureRetention: futureRetention,
logNewStreams: cfg.LogNewStreams, minFreeDiskSpaceBytes: minFreeDiskSpaceBytes,
logIngestedRows: cfg.LogIngestedRows, logNewStreams: cfg.LogNewStreams,
flockF: flockF, logIngestedRows: cfg.LogIngestedRows,
stopCh: make(chan struct{}), flockF: flockF,
stopCh: make(chan struct{}),
streamIDCache: streamIDCache, streamIDCache: streamIDCache,
streamTagsCache: streamTagsCache, streamTagsCache: streamTagsCache,
@ -298,7 +304,6 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
s.partitions = ptws s.partitions = ptws
s.runRetentionWatcher() s.runRetentionWatcher()
s.startFreeDiskSpaceWatcher(uint64(cfg.MinFreeDiskSpaceBytes))
return s return s
} }
@ -368,7 +373,6 @@ func (s *Storage) MustClose() {
// Stop background workers // Stop background workers
close(s.stopCh) close(s.stopCh)
s.wg.Wait() s.wg.Wait()
s.freeDiskSpaceWatcherWG.Wait()
// Close partitions // Close partitions
for _, pw := range s.partitions { for _, pw := range s.partitions {
@ -401,12 +405,11 @@ func (s *Storage) MustClose() {
s.path = "" s.path = ""
} }
// AddRows adds lr to s. // MustAddRows adds lr to s.
func (s *Storage) AddRows(lr *LogRows) error { //
if s.IsReadOnly() { // It is recommended checking whether the s is in read-only mode by calling IsReadOnly()
return errReadOnly // before calling MustAddRows.
} func (s *Storage) MustAddRows(lr *LogRows) {
// Fast path - try adding all the rows to the hot partition // Fast path - try adding all the rows to the hot partition
s.partitionsLock.Lock() s.partitionsLock.Lock()
ptwHot := s.ptwHot ptwHot := s.ptwHot
@ -419,7 +422,7 @@ func (s *Storage) AddRows(lr *LogRows) error {
if ptwHot.canAddAllRows(lr) { if ptwHot.canAddAllRows(lr) {
ptwHot.pt.mustAddRows(lr) ptwHot.pt.mustAddRows(lr)
ptwHot.decRef() ptwHot.decRef()
return nil return
} }
ptwHot.decRef() ptwHot.decRef()
} }
@ -463,7 +466,6 @@ func (s *Storage) AddRows(lr *LogRows) error {
ptw.decRef() ptw.decRef()
PutLogRows(lrPart) PutLogRows(lrPart)
} }
return nil
} }
var tooSmallTimestampLogger = logger.WithThrottler("too_small_timestamp", 5*time.Second) var tooSmallTimestampLogger = logger.WithThrottler("too_small_timestamp", 5*time.Second)
@ -532,44 +534,14 @@ func (s *Storage) UpdateStats(ss *StorageStats) {
ptw.pt.updateStats(&ss.PartitionStats) ptw.pt.updateStats(&ss.PartitionStats)
} }
s.partitionsLock.Unlock() s.partitionsLock.Unlock()
ss.IsReadOnly = s.IsReadOnly() ss.IsReadOnly = s.IsReadOnly()
} }
// IsReadOnly returns information is storage in read only mode // IsReadOnly returns true if s is in read-only mode.
func (s *Storage) IsReadOnly() bool { func (s *Storage) IsReadOnly() bool {
return atomic.LoadUint32(&s.isReadOnly) == 1 available := fs.MustGetFreeSpace(s.path)
} return available > s.minFreeDiskSpaceBytes
func (s *Storage) startFreeDiskSpaceWatcher(freeDiskSpaceLimitBytes uint64) {
f := func() {
freeSpaceBytes := fs.MustGetFreeSpace(s.path)
if freeSpaceBytes < freeDiskSpaceLimitBytes {
// Switch the storage to readonly mode if there is no enough free space left at s.path
logger.Warnf("switching the storage at %s to read-only mode, since it has less than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left",
s.path, freeDiskSpaceLimitBytes, freeSpaceBytes)
atomic.StoreUint32(&s.isReadOnly, 1)
return
}
if atomic.CompareAndSwapUint32(&s.isReadOnly, 1, 0) {
logger.Warnf("enabling writing to the storage at %s, since it has more than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left",
s.path, freeDiskSpaceLimitBytes, freeSpaceBytes)
}
}
f()
s.freeDiskSpaceWatcherWG.Add(1)
go func() {
defer s.freeDiskSpaceWatcherWG.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-s.stopCh:
return
case <-ticker.C:
f()
}
}
}()
} }
func (s *Storage) debugFlush() { func (s *Storage) debugFlush() {

View file

@ -70,7 +70,7 @@ func TestStorageRunQuery(t *testing.T) {
}) })
lr.MustAdd(tenantID, timestamp, fields) lr.MustAdd(tenantID, timestamp, fields)
} }
_ = s.AddRows(lr) s.MustAddRows(lr)
PutLogRows(lr) PutLogRows(lr)
} }
} }
@ -366,7 +366,7 @@ func TestStorageSearch(t *testing.T) {
}) })
lr.MustAdd(tenantID, timestamp, fields) lr.MustAdd(tenantID, timestamp, fields)
} }
_ = s.AddRows(lr) s.MustAddRows(lr)
PutLogRows(lr) PutLogRows(lr)
} }
} }

View file

@ -32,7 +32,7 @@ func TestStorageMustAddRows(t *testing.T) {
lr := newTestLogRows(1, 1, 0) lr := newTestLogRows(1, 1, 0)
lr.timestamps[0] = time.Now().UTC().UnixNano() lr.timestamps[0] = time.Now().UTC().UnixNano()
totalRowsCount += uint64(len(lr.timestamps)) totalRowsCount += uint64(len(lr.timestamps))
_ = s.AddRows(lr) s.MustAddRows(lr)
sStats.Reset() sStats.Reset()
s.UpdateStats(&sStats) s.UpdateStats(&sStats)
if n := sStats.RowsCount(); n != totalRowsCount { if n := sStats.RowsCount(); n != totalRowsCount {
@ -56,7 +56,7 @@ func TestStorageMustAddRows(t *testing.T) {
lr.timestamps[i] = time.Now().UTC().UnixNano() lr.timestamps[i] = time.Now().UTC().UnixNano()
} }
totalRowsCount += uint64(len(lr.timestamps)) totalRowsCount += uint64(len(lr.timestamps))
_ = s.AddRows(lr) s.MustAddRows(lr)
sStats.Reset() sStats.Reset()
s.UpdateStats(&sStats) s.UpdateStats(&sStats)
if n := sStats.RowsCount(); n != totalRowsCount { if n := sStats.RowsCount(); n != totalRowsCount {
@ -80,7 +80,7 @@ func TestStorageMustAddRows(t *testing.T) {
now += nsecPerDay now += nsecPerDay
} }
totalRowsCount += uint64(len(lr.timestamps)) totalRowsCount += uint64(len(lr.timestamps))
_ = s.AddRows(lr) s.MustAddRows(lr)
sStats.Reset() sStats.Reset()
s.UpdateStats(&sStats) s.UpdateStats(&sStats)
if n := sStats.RowsCount(); n != totalRowsCount { if n := sStats.RowsCount(); n != totalRowsCount {