diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index d511d37295..a7fb24823b 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -94,19 +94,20 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { httpserver.Errorf(w, r, "%s", err) return true } + if err := vlstorage.CanWriteData(); err != nil { + httpserver.Errorf(w, r, "%s", err) + return true + } lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) processLogMessage := cp.GetProcessLogMessageFunc(lr) isGzip := r.Header.Get("Content-Encoding") == "gzip" n, err := readBulkRequest(r.Body, isGzip, cp.TimeField, cp.MsgField, processLogMessage) + vlstorage.MustAddRows(lr) + logstorage.PutLogRows(lr) if err != nil { logger.Warnf("cannot decode log message #%d in /_bulk request: %s", n, err) return true } - err = vlstorage.AddRows(lr) - logstorage.PutLogRows(lr) - if err != nil { - httpserver.Errorf(w, r, "cannot insert rows: %s", err) - } tookMs := time.Since(startTime).Milliseconds() bw := bufferedwriter.Get(w) @@ -132,7 +133,7 @@ var ( ) func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, - processLogMessage func(timestamp int64, fields []logstorage.Field) error, + processLogMessage func(timestamp int64, fields []logstorage.Field), ) (int, error) { // See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html @@ -175,7 +176,7 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, var lineBufferPool bytesutil.ByteBufferPool func readBulkLine(sc *bufio.Scanner, timeField, msgField string, - processLogMessage func(timestamp int64, fields []logstorage.Field) error, + processLogMessage func(timestamp int64, fields []logstorage.Field), ) (bool, error) { var line []byte @@ -222,11 +223,8 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, ts = time.Now().UnixNano() } p.RenameField(msgField, "_msg") - err = processLogMessage(ts, p.Fields) + processLogMessage(ts, p.Fields) logjson.PutParser(p) - if err != nil { - return false, err - } return true, nil } diff --git a/app/vlinsert/elasticsearch/elasticsearch_test.go b/app/vlinsert/elasticsearch/elasticsearch_test.go index 3935e0dee6..09d1bf770c 100644 --- a/app/vlinsert/elasticsearch/elasticsearch_test.go +++ b/app/vlinsert/elasticsearch/elasticsearch_test.go @@ -15,9 +15,8 @@ func TestReadBulkRequestFailure(t *testing.T) { f := func(data string) { t.Helper() - processLogMessage := func(timestamp int64, fields []logstorage.Field) error { + processLogMessage := func(timestamp int64, fields []logstorage.Field) { t.Fatalf("unexpected call to processLogMessage with timestamp=%d, fields=%s", timestamp, fields) - return nil } r := bytes.NewBufferString(data) @@ -44,7 +43,7 @@ func TestReadBulkRequestSuccess(t *testing.T) { var timestamps []int64 var result string - processLogMessage := func(timestamp int64, fields []logstorage.Field) error { + processLogMessage := func(timestamp int64, fields []logstorage.Field) { timestamps = append(timestamps, timestamp) a := make([]string, len(fields)) @@ -53,7 +52,6 @@ func TestReadBulkRequestSuccess(t *testing.T) { } s := "{" + strings.Join(a, ",") + "}\n" result += s - return nil } // Read the request without compression diff --git a/app/vlinsert/elasticsearch/elasticsearch_timing_test.go b/app/vlinsert/elasticsearch/elasticsearch_timing_test.go index 5d8cca1b29..9a50fe0ebe 100644 --- a/app/vlinsert/elasticsearch/elasticsearch_timing_test.go +++ b/app/vlinsert/elasticsearch/elasticsearch_timing_test.go @@ -33,7 +33,7 @@ func benchmarkReadBulkRequest(b *testing.B, isGzip bool) { timeField := "@timestamp" msgField := "message" - processLogMessage := func(timestmap int64, fields []logstorage.Field) error { return nil } + processLogMessage := func(timestmap int64, fields []logstorage.Field) {} b.ReportAllocs() b.SetBytes(int64(len(data))) diff --git a/app/vlinsert/insertutils/common_params.go b/app/vlinsert/insertutils/common_params.go index 1852f22337..23f100775d 100644 --- a/app/vlinsert/insertutils/common_params.go +++ b/app/vlinsert/insertutils/common_params.go @@ -72,13 +72,13 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) { } // GetProcessLogMessageFunc returns a function, which adds parsed log messages to lr. -func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) error { - return func(timestamp int64, fields []logstorage.Field) error { +func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) { + return func(timestamp int64, fields []logstorage.Field) { if len(fields) > *MaxFieldsPerLine { rf := logstorage.RowFormatter(fields) logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, rf) rowsDroppedTotalTooManyFields.Inc() - return nil + return } lr.MustAdd(cp.TenantID, timestamp, fields) @@ -87,14 +87,12 @@ func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(ti lr.ResetKeepSettings() logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", cp.DebugRemoteAddr, cp.DebugRequestURI, s) rowsDroppedTotalDebug.Inc() - return nil + return } if lr.NeedFlush() { - err := vlstorage.AddRows(lr) + vlstorage.MustAddRows(lr) lr.ResetKeepSettings() - return err } - return nil } } diff --git a/app/vlinsert/jsonline/jsonline.go b/app/vlinsert/jsonline/jsonline.go index 863cf20479..caa86ccbfb 100644 --- a/app/vlinsert/jsonline/jsonline.go +++ b/app/vlinsert/jsonline/jsonline.go @@ -36,6 +36,10 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { httpserver.Errorf(w, r, "%s", err) return true } + if err := vlstorage.CanWriteData(); err != nil { + httpserver.Errorf(w, r, "%s", err) + return true + } lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) processLogMessage := cp.GetProcessLogMessageFunc(lr) @@ -75,12 +79,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { rowsIngestedTotal.Inc() } - err = vlstorage.AddRows(lr) + vlstorage.MustAddRows(lr) logstorage.PutLogRows(lr) - if err != nil { - httpserver.Errorf(w, r, "cannot insert rows: %s", err) - return true - } // update jsonlineRequestDuration only for successfully parsed requests. // There is no need in updating jsonlineRequestDuration for request errors, @@ -90,7 +90,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { return true } -func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field) error) (bool, error) { +func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) { var line []byte for len(line) == 0 { if !sc.Scan() { @@ -117,11 +117,8 @@ func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage f ts = time.Now().UnixNano() } p.RenameField(msgField, "_msg") - err = processLogMessage(ts, p.Fields) + processLogMessage(ts, p.Fields) logjson.PutParser(p) - if err != nil { - return false, err - } return true, nil } diff --git a/app/vlinsert/jsonline/jsonline_test.go b/app/vlinsert/jsonline/jsonline_test.go index f6da725c32..86a917491e 100644 --- a/app/vlinsert/jsonline/jsonline_test.go +++ b/app/vlinsert/jsonline/jsonline_test.go @@ -16,7 +16,7 @@ func TestReadBulkRequestSuccess(t *testing.T) { var timestamps []int64 var result string - processLogMessage := func(timestamp int64, fields []logstorage.Field) error { + processLogMessage := func(timestamp int64, fields []logstorage.Field) { timestamps = append(timestamps, timestamp) a := make([]string, len(fields)) @@ -25,8 +25,6 @@ func TestReadBulkRequestSuccess(t *testing.T) { } s := "{" + strings.Join(a, ",") + "}\n" result += s - - return nil } // Read the request without compression diff --git a/app/vlinsert/loki/loki_json.go b/app/vlinsert/loki/loki_json.go index 914b664129..263b1781ff 100644 --- a/app/vlinsert/loki/loki_json.go +++ b/app/vlinsert/loki/loki_json.go @@ -47,21 +47,20 @@ func handleJSON(r *http.Request, w http.ResponseWriter) bool { httpserver.Errorf(w, r, "cannot parse common params from request: %s", err) return true } + if err := vlstorage.CanWriteData(); err != nil { + httpserver.Errorf(w, r, "%s", err) + return true + } lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) processLogMessage := cp.GetProcessLogMessageFunc(lr) n, err := parseJSONRequest(data, processLogMessage) + vlstorage.MustAddRows(lr) + logstorage.PutLogRows(lr) if err != nil { - logstorage.PutLogRows(lr) - httpserver.Errorf(w, r, "cannot parse Loki request: %s", err) + httpserver.Errorf(w, r, "cannot parse Loki json request: %s", err) return true } - err = vlstorage.AddRows(lr) - logstorage.PutLogRows(lr) - if err != nil { - httpserver.Errorf(w, r, "cannot insert rows: %s", err) - return true - } rowsIngestedJSONTotal.Add(n) // update lokiRequestJSONDuration only for successfully parsed requests @@ -78,7 +77,7 @@ var ( lokiRequestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`) ) -func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field) error) (int, error) { +func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { p := parserPool.Get() defer parserPool.Put(p) v, err := p.ParseBytes(data) @@ -171,11 +170,7 @@ func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, field Name: "_msg", Value: bytesutil.ToUnsafeString(msg), }) - err = processLogMessage(ts, fields) - if err != nil { - return rowsIngested, err - } - + processLogMessage(ts, fields) } rowsIngested += len(lines) } diff --git a/app/vlinsert/loki/loki_json_test.go b/app/vlinsert/loki/loki_json_test.go index f285dd1f7c..93cf8652ad 100644 --- a/app/vlinsert/loki/loki_json_test.go +++ b/app/vlinsert/loki/loki_json_test.go @@ -11,9 +11,8 @@ import ( func TestParseJSONRequestFailure(t *testing.T) { f := func(s string) { t.Helper() - n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) error { + n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) { t.Fatalf("unexpected call to parseJSONRequest callback!") - return nil }) if err == nil { t.Fatalf("expecting non-nil error") @@ -61,14 +60,13 @@ func TestParseJSONRequestSuccess(t *testing.T) { f := func(s string, resultExpected string) { t.Helper() var lines []string - n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) error { + n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) { var a []string for _, f := range fields { a = append(a, f.String()) } line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " ")) lines = append(lines, line) - return nil }) if err != nil { t.Fatalf("unexpected error: %s", err) diff --git a/app/vlinsert/loki/loki_json_timing_test.go b/app/vlinsert/loki/loki_json_timing_test.go index 37d922fc02..9c51f593a1 100644 --- a/app/vlinsert/loki/loki_json_timing_test.go +++ b/app/vlinsert/loki/loki_json_timing_test.go @@ -27,7 +27,7 @@ func benchmarkParseJSONRequest(b *testing.B, streams, rows, labels int) { b.RunParallel(func(pb *testing.PB) { data := getJSONBody(streams, rows, labels) for pb.Next() { - _, err := parseJSONRequest(data, func(timestamp int64, fields []logstorage.Field) error { return nil }) + _, err := parseJSONRequest(data, func(timestamp int64, fields []logstorage.Field) {}) if err != nil { panic(fmt.Errorf("unexpected error: %s", err)) } diff --git a/app/vlinsert/loki/loki_protobuf.go b/app/vlinsert/loki/loki_protobuf.go index eb262dcc80..af489329b9 100644 --- a/app/vlinsert/loki/loki_protobuf.go +++ b/app/vlinsert/loki/loki_protobuf.go @@ -39,19 +39,17 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) bool { httpserver.Errorf(w, r, "cannot parse common params from request: %s", err) return true } + if err := vlstorage.CanWriteData(); err != nil { + httpserver.Errorf(w, r, "%s", err) + return true + } lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields) processLogMessage := cp.GetProcessLogMessageFunc(lr) n, err := parseProtobufRequest(data, processLogMessage) - if err != nil { - logstorage.PutLogRows(lr) - httpserver.Errorf(w, r, "cannot parse Loki request: %s", err) - return true - } - - err = vlstorage.AddRows(lr) + vlstorage.MustAddRows(lr) logstorage.PutLogRows(lr) if err != nil { - httpserver.Errorf(w, r, "cannot insert rows: %s", err) + httpserver.Errorf(w, r, "cannot parse Loki protobuf request: %s", err) return true } @@ -71,7 +69,7 @@ var ( lokiRequestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`) ) -func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field) error) (int, error) { +func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { bb := bytesBufPool.Get() defer bytesBufPool.Put(bb) @@ -114,10 +112,7 @@ func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, f if ts == 0 { ts = currentTimestamp } - err = processLogMessage(ts, fields) - if err != nil { - return rowsIngested, err - } + processLogMessage(ts, fields) } rowsIngested += len(stream.Entries) } diff --git a/app/vlinsert/loki/loki_protobuf_test.go b/app/vlinsert/loki/loki_protobuf_test.go index cc259bce58..f6eb5f0ec2 100644 --- a/app/vlinsert/loki/loki_protobuf_test.go +++ b/app/vlinsert/loki/loki_protobuf_test.go @@ -14,7 +14,7 @@ func TestParseProtobufRequestSuccess(t *testing.T) { f := func(s string, resultExpected string) { t.Helper() var pr PushRequest - n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) error { + n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) { msg := "" for _, f := range fields { if f.Name == "_msg" { @@ -39,7 +39,6 @@ func TestParseProtobufRequestSuccess(t *testing.T) { }, }, }) - return nil }) if err != nil { t.Fatalf("unexpected error: %s", err) @@ -55,14 +54,13 @@ func TestParseProtobufRequestSuccess(t *testing.T) { encodedData := snappy.Encode(nil, data) var lines []string - n, err = parseProtobufRequest(encodedData, func(timestamp int64, fields []logstorage.Field) error { + n, err = parseProtobufRequest(encodedData, func(timestamp int64, fields []logstorage.Field) { var a []string for _, f := range fields { a = append(a, f.String()) } line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " ")) lines = append(lines, line) - return nil }) if err != nil { t.Fatalf("unexpected error: %s", err) diff --git a/app/vlinsert/loki/loki_protobuf_timing_test.go b/app/vlinsert/loki/loki_protobuf_timing_test.go index 230ab7a472..1b7cbd6476 100644 --- a/app/vlinsert/loki/loki_protobuf_timing_test.go +++ b/app/vlinsert/loki/loki_protobuf_timing_test.go @@ -29,7 +29,7 @@ func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) { b.RunParallel(func(pb *testing.PB) { body := getProtobufBody(streams, rows, labels) for pb.Next() { - _, err := parseProtobufRequest(body, func(timestamp int64, fields []logstorage.Field) error { return nil }) + _, err := parseProtobufRequest(body, func(timestamp int64, fields []logstorage.Field) {}) if err != nil { panic(fmt.Errorf("unexpected error: %s", err)) } diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index 4533e7b25b..b1b55675c9 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -3,6 +3,7 @@ package vlstorage import ( "flag" "fmt" + "net/http" "sync" "time" @@ -10,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" ) @@ -30,7 +32,8 @@ var ( "see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields ; see also -logIngestedRows") logIngestedRows = flag.Bool("logIngestedRows", false, "Whether to log all the ingested log entries; this can be useful for debugging of data ingestion; "+ "see https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/ ; see also -logNewStreams") - minFreeDiskSpaceBytes = flagutil.NewBytes("storage.minFreeDiskSpaceBytes", 10e6, "The minimum free disk space at -storageDataPath after which the storage stops accepting new data") + minFreeDiskSpaceBytes = flagutil.NewBytes("storage.minFreeDiskSpaceBytes", 10e6, "The minimum free disk space at -storageDataPath after which "+ + "the storage stops accepting new data") ) // Init initializes vlstorage. @@ -77,9 +80,23 @@ func Stop() { var strg *logstorage.Storage var storageMetrics *metrics.Set -// AddRows adds lr to vlstorage -func AddRows(lr *logstorage.LogRows) error { - return strg.AddRows(lr) +// CanWriteData returns non-nil error if it cannot write data to vlstorage. +func CanWriteData() error { + if strg.IsReadOnly() { + return &httpserver.ErrorWithStatusCode{ + Err: fmt.Errorf("cannot add rows into storage in read-only mode; the storage can be in read-only mode "+ + "because of lack of free disk space at -storageDataPath=%s", *storageDataPath), + StatusCode: http.StatusTooManyRequests, + } + } + return nil +} + +// MustAddRows adds lr to vlstorage +// +// It is advised to call CanWriteData() before calling MustAddRows() +func MustAddRows(lr *logstorage.LogRows) { + strg.MustAddRows(lr) } // RunQuery runs the given q and calls processBlock for the returned data blocks @@ -114,7 +131,6 @@ func initStorageMetrics(strg *logstorage.Storage) *metrics.Set { if m().IsReadOnly { return 1 } - return 0 }) diff --git a/lib/logstorage/datadb.go b/lib/logstorage/datadb.go index a3fe6423d0..d5b42b62ee 100644 --- a/lib/logstorage/datadb.go +++ b/lib/logstorage/datadb.go @@ -2,8 +2,8 @@ package logstorage import ( "encoding/json" - "errors" "fmt" + "math" "os" "path/filepath" "sort" @@ -70,18 +70,15 @@ type datadb struct { // stopCh is used for notifying background workers to stop stopCh chan struct{} - // inmemoryPartsFlushersCount is the number of currently running in-memory parts flushers + // oldInmemoryPartsFlushersCount is the number of currently running flushers for old in-memory parts // // This variable must be accessed under partsLock. - inmemoryPartsFlushersCount int + oldInmemoryPartsFlushersCount int // mergeWorkersCount is the number of currently running merge workers // // This variable must be accessed under partsLock. mergeWorkersCount int - - // isReadOnly indicates whether the storage is in read-only mode. - isReadOnly *uint32 } // partWrapper is a wrapper for opened part. @@ -141,7 +138,7 @@ func mustCreateDatadb(path string) { } // mustOpenDatadb opens datadb at the given path with the given flushInterval for in-memory data. -func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration, isReadOnly *uint32) *datadb { +func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *datadb { // Remove temporary directories, which may be left after unclean shutdown. fs.MustRemoveTemporaryDirs(path) @@ -173,7 +170,6 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration, isR path: path, fileParts: pws, stopCh: make(chan struct{}), - isReadOnly: isReadOnly, } // Start merge workers in the hope they'll merge the remaining parts @@ -187,51 +183,65 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration, isR return ddb } -// startInmemoryPartsFlusherLocked starts flusher for in-memory parts to disk. +// startOldInmemoryPartsFlusherLocked starts flusher for old in-memory parts to disk. // // This function must be called under partsLock. -func (ddb *datadb) startInmemoryPartsFlusherLocked() { +func (ddb *datadb) startOldInmemoryPartsFlusherLocked() { maxWorkers := getMergeWorkersCount() - if ddb.inmemoryPartsFlushersCount >= maxWorkers { + if ddb.oldInmemoryPartsFlushersCount >= maxWorkers { return } - ddb.inmemoryPartsFlushersCount++ + ddb.oldInmemoryPartsFlushersCount++ ddb.wg.Add(1) go func() { - ddb.flushInmemoryParts() + ddb.flushOldInmemoryParts() ddb.wg.Done() }() } -func (ddb *datadb) flushInmemoryParts() { +func (ddb *datadb) flushOldInmemoryParts() { ticker := time.NewTicker(time.Second) defer ticker.Stop() - for { + var parts, partsToMerge []*partWrapper + + for !needStop(ddb.stopCh) { ddb.partsLock.Lock() - pws := make([]*partWrapper, 0, len(ddb.inmemoryParts)) - pws = appendNotInMergePartsLocked(pws, ddb.inmemoryParts) + parts = appendNotInMergePartsLocked(parts[:0], ddb.inmemoryParts) currentTime := time.Now() - partsToFlush := pws[:0] - for _, pw := range pws { + partsToFlush := parts[:0] + for _, pw := range parts { if pw.flushDeadline.Before(currentTime) { partsToFlush = append(partsToFlush, pw) } } - setInMergeLocked(partsToFlush) - if len(pws) == 0 { - ddb.inmemoryPartsFlushersCount-- + // Do not take into account available disk space when flushing in-memory parts to disk, + // since otherwise the outdated in-memory parts may remain in-memory, which, in turn, + // may result in increased memory usage plus possible loss of historical data. + // It is better to crash on out of disk error in this case. + partsToMerge = appendPartsToMerge(partsToMerge[:0], partsToFlush, math.MaxUint64) + if len(partsToMerge) == 0 { + partsToMerge = append(partsToMerge[:0], partsToFlush...) + } + setInMergeLocked(partsToMerge) + needStop := false + if len(ddb.inmemoryParts) == 0 { + // There are no in-memory parts, so stop the flusher. + needStop = true + ddb.oldInmemoryPartsFlushersCount-- } ddb.partsLock.Unlock() - if len(pws) == 0 { - // There are no in-memory parts, so stop the flusher. + if needStop { return } - err := ddb.mergePartsFinal(partsToFlush) - if err != nil { - logger.Panicf("FATAL: cannot flush inmemory parts to disk: %s", err) + + ddb.mustMergeParts(partsToMerge, true) + if len(partsToMerge) < len(partsToFlush) { + // Continue merging remaining old in-memory parts from partsToFlush list. + continue } + // There are no old in-memory parts to flush. Sleep for a while until these parts appear. select { case <-ddb.stopCh: return @@ -244,9 +254,6 @@ func (ddb *datadb) flushInmemoryParts() { // // This function must be called under locked partsLock. func (ddb *datadb) startMergeWorkerLocked() { - if ddb.IsReadOnly() { - return - } maxWorkers := getMergeWorkersCount() if ddb.mergeWorkersCount >= maxWorkers { return @@ -255,11 +262,8 @@ func (ddb *datadb) startMergeWorkerLocked() { ddb.wg.Add(1) go func() { globalMergeLimitCh <- struct{}{} - err := ddb.mergeExistingParts() + ddb.mustMergeExistingParts() <-globalMergeLimitCh - if err != nil && !errors.Is(err, errReadOnly) { - logger.Panicf("FATAL: background merge failed: %s", err) - } ddb.wg.Done() }() } @@ -279,9 +283,9 @@ func getMergeWorkersCount() int { return n } -func (ddb *datadb) mergeExistingParts() error { +func (ddb *datadb) mustMergeExistingParts() { for !needStop(ddb.stopCh) { - maxOutBytes := ddb.availableDiskSpace() + maxOutBytes := availableDiskSpace(ddb.path) ddb.partsLock.Lock() parts := make([]*partWrapper, 0, len(ddb.inmemoryParts)+len(ddb.fileParts)) @@ -296,25 +300,11 @@ func (ddb *datadb) mergeExistingParts() error { if len(pws) == 0 { // Nothing to merge at the moment. - return nil + return } - partsSize := getCompressedSize(pws) - if !ddb.reserveDiskSpace(partsSize) { - // There is no free disk space for the merge, - // because concurrent merge workers already reserved the disk space. - // Try again with smaller maxOutBytes. - ddb.releasePartsToMerge(pws) - continue - } - err := ddb.mergeParts(pws, false) - ddb.releaseDiskSpace(partsSize) - if err != nil { - return err - } + ddb.mustMergeParts(pws, false) } - - return nil } // appendNotInMergePartsLocked appends src parts with isInMerge=false to dst and returns the result. @@ -349,29 +339,42 @@ func assertIsInMerge(pws []*partWrapper) { } } -var errReadOnly = errors.New("the storage is in read-only mode") - -// mergeParts merges pws to a single resulting part. +// mustMergeParts merges pws to a single resulting part. // -// if isFinal is set, then the resulting part will be saved to disk. +// if isFinal is set, then the resulting part is guaranteed to be saved to disk. +// The pws may remain unmerged after returning from the function if there is no enough disk space. // // All the parts inside pws must have isInMerge field set to true. // The isInMerge field inside pws parts is set to false before returning from the function. -func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error { +func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) { if len(pws) == 0 { // Nothing to merge. - return nil + return } - if ddb.IsReadOnly() { - return errReadOnly - } assertIsInMerge(pws) defer ddb.releasePartsToMerge(pws) startTime := time.Now() dstPartType := ddb.getDstPartType(pws, isFinal) + if dstPartType == partFile { + // Make sure there is enough disk space for performing the merge + partsSize := getCompressedSize(pws) + needReleaseDiskSpace := tryReserveDiskSpace(ddb.path, partsSize) + if needReleaseDiskSpace { + defer releaseDiskSpace(partsSize) + } else { + if !isFinal { + // There is no enough disk space for performing the non-final merge. + return + } + // Try performing final merge even if there is no enough disk space + // in order to persist in-memory data to disk. + // It is better to crash on out of memory error in this case. + } + } + if dstPartType == partInmemory { atomic.AddUint64(&ddb.inmemoryMergesTotal, 1) atomic.AddUint64(&ddb.inmemoryActiveMerges, 1) @@ -392,7 +395,7 @@ func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error { mp.MustStoreToDisk(dstPartPath) pwNew := ddb.openCreatedPart(&mp.ph, pws, nil, dstPartPath) ddb.swapSrcWithDstParts(pws, pwNew, dstPartType) - return nil + return } // Prepare blockStreamReaders for source parts. @@ -443,7 +446,7 @@ func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error { if dstPartType == partFile { fs.MustRemoveAll(dstPartPath) } - return nil + return } // Atomically swap the source parts with the newly created part. @@ -463,7 +466,7 @@ func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error { d := time.Since(startTime) if d <= 30*time.Second { - return nil + return } // Log stats for long merges. @@ -471,7 +474,6 @@ func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error { rowsPerSec := int(float64(srcRowsCount) / durationSecs) logger.Infof("merged (%d parts, %d rows, %d blocks, %d bytes) into (1 part, %d rows, %d blocks, %d bytes) in %.3f seconds at %d rows/sec to %q", len(pws), srcRowsCount, srcBlocksCount, srcSize, dstRowsCount, dstBlocksCount, dstSize, durationSecs, rowsPerSec, dstPartPath) - return nil } func (ddb *datadb) nextMergeIdx() uint64 { @@ -546,7 +548,7 @@ func (ddb *datadb) mustAddRows(lr *LogRows) { ddb.partsLock.Lock() ddb.inmemoryParts = append(ddb.inmemoryParts, pw) - ddb.startInmemoryPartsFlusherLocked() + ddb.startOldInmemoryPartsFlusherLocked() if len(ddb.inmemoryParts) > defaultPartsToMerge { ddb.startMergeWorkerLocked() } @@ -559,9 +561,6 @@ func (ddb *datadb) mustAddRows(lr *LogRows) { } func (ddb *datadb) needAssistedMergeForInmemoryPartsLocked() bool { - if ddb.IsReadOnly() { - return false - } if len(ddb.inmemoryParts) < maxInmemoryPartsPerPartition { return false } @@ -578,15 +577,16 @@ func (ddb *datadb) assistedMergeForInmemoryParts() { ddb.partsLock.Lock() parts := make([]*partWrapper, 0, len(ddb.inmemoryParts)) parts = appendNotInMergePartsLocked(parts, ddb.inmemoryParts) - pws := appendPartsToMerge(nil, parts, (1<<64)-1) + // Do not take into account available disk space when merging in-memory parts, + // since otherwise the outdated in-memory parts may remain in-memory, which, in turn, + // may result in increased memory usage plus possible loss of historical data. + // It is better to crash on out of disk error in this case. + pws := make([]*partWrapper, 0, len(parts)) + pws = appendPartsToMerge(pws[:0], parts, math.MaxUint64) setInMergeLocked(pws) ddb.partsLock.Unlock() - err := ddb.mergeParts(pws, false) - if err == nil || errors.Is(err, errReadOnly) { - return - } - logger.Panicf("FATAL: cannot perform assisted merge for in-memory parts: %s", err) + ddb.mustMergeParts(pws, false) } // DatadbStats contains various stats for datadb. @@ -675,12 +675,18 @@ func (ddb *datadb) debugFlush() { // Nothing to do, since all the ingested data is available for search via ddb.inmemoryParts. } -func (ddb *datadb) mergePartsFinal(pws []*partWrapper) error { - assertIsInMerge(pws) +func (ddb *datadb) mustFlushInmemoryPartsToDisk() { + ddb.partsLock.Lock() + pws := append([]*partWrapper{}, ddb.inmemoryParts...) + setInMergeLocked(pws) + ddb.partsLock.Unlock() var pwsChunk []*partWrapper for len(pws) > 0 { - pwsChunk = appendPartsToMerge(pwsChunk[:0], pws, (1<<64)-1) + // Do not take into account available disk space when performing the final flush of in-memory parts to disk, + // since otherwise these parts will be lost. + // It is better to crash on out of disk error in this case. + pwsChunk = appendPartsToMerge(pwsChunk[:0], pws, math.MaxUint64) if len(pwsChunk) == 0 { pwsChunk = append(pwsChunk[:0], pws...) } @@ -691,13 +697,8 @@ func (ddb *datadb) mergePartsFinal(pws []*partWrapper) error { logger.Panicf("BUG: unexpected number of parts removed; got %d; want %d", removedParts, len(pwsChunk)) } - err := ddb.mergeParts(pwsChunk, true) - if err != nil { - ddb.releasePartsToMerge(pws) - return err - } + ddb.mustMergeParts(pwsChunk, true) } - return nil } func partsToMap(pws []*partWrapper) map[*partWrapper]struct{} { @@ -725,7 +726,7 @@ func (ddb *datadb) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, d switch dstPartType { case partInmemory: ddb.inmemoryParts = append(ddb.inmemoryParts, pwNew) - ddb.startInmemoryPartsFlusherLocked() + ddb.startOldInmemoryPartsFlusherLocked() case partFile: ddb.fileParts = append(ddb.fileParts, pwNew) default: @@ -840,8 +841,8 @@ func (ddb *datadb) releasePartsToMerge(pws []*partWrapper) { ddb.partsLock.Unlock() } -func (ddb *datadb) availableDiskSpace() uint64 { - available := fs.MustGetFreeSpace(ddb.path) +func availableDiskSpace(path string) uint64 { + available := fs.MustGetFreeSpace(path) reserved := atomic.LoadUint64(&reservedDiskSpace) if available < reserved { return 0 @@ -849,22 +850,22 @@ func (ddb *datadb) availableDiskSpace() uint64 { return available - reserved } -func (ddb *datadb) reserveDiskSpace(n uint64) bool { - available := fs.MustGetFreeSpace(ddb.path) - reserved := atomic.AddUint64(&reservedDiskSpace, n) +func tryReserveDiskSpace(path string, n uint64) bool { + available := fs.MustGetFreeSpace(path) + reserved := reserveDiskSpace(n) if available > reserved { return true } - ddb.releaseDiskSpace(n) + releaseDiskSpace(n) return false } -func (ddb *datadb) releaseDiskSpace(n uint64) { - atomic.AddUint64(&reservedDiskSpace, -n) +func reserveDiskSpace(n uint64) uint64 { + return atomic.AddUint64(&reservedDiskSpace, n) } -func (ddb *datadb) IsReadOnly() bool { - return atomic.LoadUint32(ddb.isReadOnly) == 1 +func releaseDiskSpace(n uint64) { + atomic.AddUint64(&reservedDiskSpace, ^(n - 1)) } // reservedDiskSpace tracks global reserved disk space for currently executed @@ -889,22 +890,13 @@ func mustCloseDatadb(ddb *datadb) { ddb.wg.Wait() // flush in-memory data to disk - pws := append([]*partWrapper{}, ddb.inmemoryParts...) - setInMergeLocked(pws) - err := ddb.mergePartsFinal(pws) - if err != nil { - logger.Fatalf("FATAL: cannot merge inmemory parts: %s", err) - } - - // There is no need in using ddb.partsLock here, since nobody should acces ddb now. - for _, pw := range ddb.inmemoryParts { - pw.decRef() - if pw.refCount != 0 { - logger.Panicf("BUG: there are %d references to inmemoryPart", pw.refCount) - } + ddb.mustFlushInmemoryPartsToDisk() + if len(ddb.inmemoryParts) > 0 { + logger.Panicf("BUG: the number of in-memory parts must be zero after flushing them to disk; got %d", len(ddb.inmemoryParts)) } ddb.inmemoryParts = nil + // close file parts for _, pw := range ddb.fileParts { pw.decRef() if pw.refCount != 0 { diff --git a/lib/logstorage/filters.go b/lib/logstorage/filters.go index 168b40d547..2c75d65cff 100644 --- a/lib/logstorage/filters.go +++ b/lib/logstorage/filters.go @@ -2976,8 +2976,8 @@ func toUint64Clamp(f float64) uint64 { if f < 0 { return 0 } - if f > (1<<64)-1 { - return (1 << 64) - 1 + if f > math.MaxUint64 { + return math.MaxUint64 } return uint64(f) } diff --git a/lib/logstorage/filters_test.go b/lib/logstorage/filters_test.go index 289492045d..cf7d6e7827 100644 --- a/lib/logstorage/filters_test.go +++ b/lib/logstorage/filters_test.go @@ -9277,7 +9277,7 @@ func generateRowsFromColumns(s *Storage, tenantID TenantID, columns []column) { timestamp := int64(i) * 1e9 lr.MustAdd(tenantID, timestamp, fields) } - _ = s.AddRows(lr) + s.MustAddRows(lr) PutLogRows(lr) } @@ -9291,6 +9291,6 @@ func generateRowsFromTimestamps(s *Storage, tenantID TenantID, timestamps []int6 }) lr.MustAdd(tenantID, timestamp, fields) } - _ = s.AddRows(lr) + s.MustAddRows(lr) PutLogRows(lr) } diff --git a/lib/logstorage/log_rows.go b/lib/logstorage/log_rows.go index 789dab6a34..ce759b85a8 100644 --- a/lib/logstorage/log_rows.go +++ b/lib/logstorage/log_rows.go @@ -7,7 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) -// LogRows holds a set of rows needed for Storage.AddRows +// LogRows holds a set of rows needed for Storage.MustAddRows // // LogRows must be obtained via GetLogRows() type LogRows struct { diff --git a/lib/logstorage/partition.go b/lib/logstorage/partition.go index df609c3aa3..64465de209 100644 --- a/lib/logstorage/partition.go +++ b/lib/logstorage/partition.go @@ -77,7 +77,7 @@ func mustOpenPartition(s *Storage, path string) *partition { // Open datadb datadbPath := filepath.Join(path, datadbDirname) - pt.ddb = mustOpenDatadb(pt, datadbPath, s.flushInterval, &s.isReadOnly) + pt.ddb = mustOpenDatadb(pt, datadbPath, s.flushInterval) return pt } diff --git a/lib/logstorage/storage.go b/lib/logstorage/storage.go index 341ec46c18..96cf041a02 100644 --- a/lib/logstorage/storage.go +++ b/lib/logstorage/storage.go @@ -25,10 +25,11 @@ type StorageStats struct { // PartitionsCount is the number of partitions in the storage PartitionsCount uint64 - PartitionStats - // IsReadOnly indicates whether the storage is read-only. IsReadOnly bool + + // PartitionStats contains partition stats. + PartitionStats } // Reset resets s. @@ -51,6 +52,9 @@ type StorageConfig struct { // Log entries with timestamps bigger than now+FutureRetention are ignored. FutureRetention time.Duration + // MinFreeDiskSpaceBytes is the minimum free disk space at storage path after which the storage stops accepting new data. + MinFreeDiskSpaceBytes int64 + // LogNewStreams indicates whether to log newly created log streams. // // This can be useful for debugging of high cardinality issues. @@ -61,9 +65,6 @@ type StorageConfig struct { // // This can be useful for debugging of data ingestion. LogIngestedRows bool - - // MinFreeDiskSpaceBytes is the minimum free disk space at -storageDataPath after which the storage stops accepting new data - MinFreeDiskSpaceBytes int64 } // Storage is the storage for log entries. @@ -85,6 +86,9 @@ type Storage struct { // futureRetention is the maximum allowed interval to write data into the future futureRetention time.Duration + // minFreeDiskSpaceBytes is the minimum free disk space at path after which the storage stops accepting new data + minFreeDiskSpaceBytes uint64 + // logNewStreams instructs to log new streams if it is set to true logNewStreams bool @@ -132,10 +136,6 @@ type Storage struct { // // It reduces the load on persistent storage during querying by _stream:{...} filter. streamFilterCache *workingsetcache.Cache - - isReadOnly uint32 - - freeDiskSpaceWatcherWG sync.WaitGroup } type partitionWrapper struct { @@ -225,6 +225,11 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage { futureRetention = 24 * time.Hour } + var minFreeDiskSpaceBytes uint64 + if cfg.MinFreeDiskSpaceBytes >= 0 { + minFreeDiskSpaceBytes = uint64(cfg.MinFreeDiskSpaceBytes) + } + if !fs.IsPathExist(path) { mustCreateStorage(path) } @@ -241,14 +246,15 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage { streamFilterCache := workingsetcache.New(mem / 10) s := &Storage{ - path: path, - retention: retention, - flushInterval: flushInterval, - futureRetention: futureRetention, - logNewStreams: cfg.LogNewStreams, - logIngestedRows: cfg.LogIngestedRows, - flockF: flockF, - stopCh: make(chan struct{}), + path: path, + retention: retention, + flushInterval: flushInterval, + futureRetention: futureRetention, + minFreeDiskSpaceBytes: minFreeDiskSpaceBytes, + logNewStreams: cfg.LogNewStreams, + logIngestedRows: cfg.LogIngestedRows, + flockF: flockF, + stopCh: make(chan struct{}), streamIDCache: streamIDCache, streamTagsCache: streamTagsCache, @@ -298,7 +304,6 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage { s.partitions = ptws s.runRetentionWatcher() - s.startFreeDiskSpaceWatcher(uint64(cfg.MinFreeDiskSpaceBytes)) return s } @@ -368,7 +373,6 @@ func (s *Storage) MustClose() { // Stop background workers close(s.stopCh) s.wg.Wait() - s.freeDiskSpaceWatcherWG.Wait() // Close partitions for _, pw := range s.partitions { @@ -401,12 +405,11 @@ func (s *Storage) MustClose() { s.path = "" } -// AddRows adds lr to s. -func (s *Storage) AddRows(lr *LogRows) error { - if s.IsReadOnly() { - return errReadOnly - } - +// MustAddRows adds lr to s. +// +// It is recommended checking whether the s is in read-only mode by calling IsReadOnly() +// before calling MustAddRows. +func (s *Storage) MustAddRows(lr *LogRows) { // Fast path - try adding all the rows to the hot partition s.partitionsLock.Lock() ptwHot := s.ptwHot @@ -419,7 +422,7 @@ func (s *Storage) AddRows(lr *LogRows) error { if ptwHot.canAddAllRows(lr) { ptwHot.pt.mustAddRows(lr) ptwHot.decRef() - return nil + return } ptwHot.decRef() } @@ -463,7 +466,6 @@ func (s *Storage) AddRows(lr *LogRows) error { ptw.decRef() PutLogRows(lrPart) } - return nil } var tooSmallTimestampLogger = logger.WithThrottler("too_small_timestamp", 5*time.Second) @@ -532,44 +534,14 @@ func (s *Storage) UpdateStats(ss *StorageStats) { ptw.pt.updateStats(&ss.PartitionStats) } s.partitionsLock.Unlock() + ss.IsReadOnly = s.IsReadOnly() } -// IsReadOnly returns information is storage in read only mode +// IsReadOnly returns true if s is in read-only mode. func (s *Storage) IsReadOnly() bool { - return atomic.LoadUint32(&s.isReadOnly) == 1 -} - -func (s *Storage) startFreeDiskSpaceWatcher(freeDiskSpaceLimitBytes uint64) { - f := func() { - freeSpaceBytes := fs.MustGetFreeSpace(s.path) - if freeSpaceBytes < freeDiskSpaceLimitBytes { - // Switch the storage to readonly mode if there is no enough free space left at s.path - logger.Warnf("switching the storage at %s to read-only mode, since it has less than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left", - s.path, freeDiskSpaceLimitBytes, freeSpaceBytes) - atomic.StoreUint32(&s.isReadOnly, 1) - return - } - if atomic.CompareAndSwapUint32(&s.isReadOnly, 1, 0) { - logger.Warnf("enabling writing to the storage at %s, since it has more than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left", - s.path, freeDiskSpaceLimitBytes, freeSpaceBytes) - } - } - f() - s.freeDiskSpaceWatcherWG.Add(1) - go func() { - defer s.freeDiskSpaceWatcherWG.Done() - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for { - select { - case <-s.stopCh: - return - case <-ticker.C: - f() - } - } - }() + available := fs.MustGetFreeSpace(s.path) + return available > s.minFreeDiskSpaceBytes } func (s *Storage) debugFlush() { diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index d61035a5dc..63404838ce 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -70,7 +70,7 @@ func TestStorageRunQuery(t *testing.T) { }) lr.MustAdd(tenantID, timestamp, fields) } - _ = s.AddRows(lr) + s.MustAddRows(lr) PutLogRows(lr) } } @@ -366,7 +366,7 @@ func TestStorageSearch(t *testing.T) { }) lr.MustAdd(tenantID, timestamp, fields) } - _ = s.AddRows(lr) + s.MustAddRows(lr) PutLogRows(lr) } } diff --git a/lib/logstorage/storage_test.go b/lib/logstorage/storage_test.go index 9951a6a4c5..193179bb17 100644 --- a/lib/logstorage/storage_test.go +++ b/lib/logstorage/storage_test.go @@ -32,7 +32,7 @@ func TestStorageMustAddRows(t *testing.T) { lr := newTestLogRows(1, 1, 0) lr.timestamps[0] = time.Now().UTC().UnixNano() totalRowsCount += uint64(len(lr.timestamps)) - _ = s.AddRows(lr) + s.MustAddRows(lr) sStats.Reset() s.UpdateStats(&sStats) if n := sStats.RowsCount(); n != totalRowsCount { @@ -56,7 +56,7 @@ func TestStorageMustAddRows(t *testing.T) { lr.timestamps[i] = time.Now().UTC().UnixNano() } totalRowsCount += uint64(len(lr.timestamps)) - _ = s.AddRows(lr) + s.MustAddRows(lr) sStats.Reset() s.UpdateStats(&sStats) if n := sStats.RowsCount(); n != totalRowsCount { @@ -80,7 +80,7 @@ func TestStorageMustAddRows(t *testing.T) { now += nsecPerDay } totalRowsCount += uint64(len(lr.timestamps)) - _ = s.AddRows(lr) + s.MustAddRows(lr) sStats.Reset() s.UpdateStats(&sStats) if n := sStats.RowsCount(); n != totalRowsCount {