From dfdada055cf5b2b36e7a8dd78f807f8919a9822b Mon Sep 17 00:00:00 2001
From: Zakhar Bessarab <z.bessarab@victoriametrics.com>
Date: Fri, 29 Sep 2023 13:55:38 +0400
Subject: [PATCH] lib/logstorage: switch to read-only mode when running out of
 disk space (#4945)

* lib/logstorage: switch to read-only mode when running out of disk space

Added support of `--storage.minFreeDiskSpaceBytes` command-line flag to allow graceful handling of running out of disk space at `--storageDataPath`.

See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4737
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/logstorage: fix error handling logic during merge

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/logstorage: fix log level

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
Co-authored-by: Nikolay <nik@victoriametrics.com>
---
 app/vlinsert/elasticsearch/elasticsearch.go   | 18 +++--
 .../elasticsearch/elasticsearch_test.go       |  6 +-
 .../elasticsearch_timing_test.go              |  2 +-
 app/vlinsert/insertutils/common_params.go     | 12 ++--
 app/vlinsert/jsonline/jsonline.go             | 14 +++-
 app/vlinsert/jsonline/jsonline_test.go        |  4 +-
 app/vlinsert/loki/loki_json.go                | 17 +++--
 app/vlinsert/loki/loki_json_test.go           |  6 +-
 app/vlinsert/loki/loki_json_timing_test.go    |  2 +-
 app/vlinsert/loki/loki_protobuf.go            | 17 +++--
 app/vlinsert/loki/loki_protobuf_test.go       |  6 +-
 .../loki/loki_protobuf_timing_test.go         |  5 +-
 app/vlstorage/main.go                         | 28 +++++---
 docs/VictoriaLogs/CHANGELOG.md                |  2 +
 docs/VictoriaLogs/README.md                   |  3 +
 lib/logstorage/datadb.go                      | 69 +++++++++++++++----
 lib/logstorage/filters_test.go                |  4 +-
 lib/logstorage/log_rows.go                    |  2 +-
 lib/logstorage/partition.go                   |  2 +-
 lib/logstorage/storage.go                     | 61 +++++++++++++++-
 lib/logstorage/storage_search_test.go         |  4 +-
 lib/logstorage/storage_test.go                |  6 +-
 22 files changed, 222 insertions(+), 68 deletions(-)

diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go
index a3ae68f19..d511d3729 100644
--- a/app/vlinsert/elasticsearch/elasticsearch.go
+++ b/app/vlinsert/elasticsearch/elasticsearch.go
@@ -12,6 +12,8 @@ import (
 	"strings"
 	"time"
 
+	"github.com/VictoriaMetrics/metrics"
+
 	"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
 	"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
@@ -22,7 +24,6 @@ import (
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
-	"github.com/VictoriaMetrics/metrics"
 )
 
 var (
@@ -101,8 +102,11 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
 			logger.Warnf("cannot decode log message #%d in /_bulk request: %s", n, err)
 			return true
 		}
-		vlstorage.MustAddRows(lr)
+		err = vlstorage.AddRows(lr)
 		logstorage.PutLogRows(lr)
+		if err != nil {
+			httpserver.Errorf(w, r, "cannot insert rows: %s", err)
+		}
 
 		tookMs := time.Since(startTime).Milliseconds()
 		bw := bufferedwriter.Get(w)
@@ -128,7 +132,7 @@ var (
 )
 
 func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
-	processLogMessage func(timestamp int64, fields []logstorage.Field),
+	processLogMessage func(timestamp int64, fields []logstorage.Field) error,
 ) (int, error) {
 	// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
 
@@ -171,7 +175,7 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
 var lineBufferPool bytesutil.ByteBufferPool
 
 func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
-	processLogMessage func(timestamp int64, fields []logstorage.Field),
+	processLogMessage func(timestamp int64, fields []logstorage.Field) error,
 ) (bool, error) {
 	var line []byte
 
@@ -218,8 +222,12 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
 		ts = time.Now().UnixNano()
 	}
 	p.RenameField(msgField, "_msg")
-	processLogMessage(ts, p.Fields)
+	err = processLogMessage(ts, p.Fields)
 	logjson.PutParser(p)
+	if err != nil {
+		return false, err
+	}
+
 	return true, nil
 }
 
diff --git a/app/vlinsert/elasticsearch/elasticsearch_test.go b/app/vlinsert/elasticsearch/elasticsearch_test.go
index 09d1bf770..3935e0dee 100644
--- a/app/vlinsert/elasticsearch/elasticsearch_test.go
+++ b/app/vlinsert/elasticsearch/elasticsearch_test.go
@@ -15,8 +15,9 @@ func TestReadBulkRequestFailure(t *testing.T) {
 	f := func(data string) {
 		t.Helper()
 
-		processLogMessage := func(timestamp int64, fields []logstorage.Field) {
+		processLogMessage := func(timestamp int64, fields []logstorage.Field) error {
 			t.Fatalf("unexpected call to processLogMessage with timestamp=%d, fields=%s", timestamp, fields)
+			return nil
 		}
 
 		r := bytes.NewBufferString(data)
@@ -43,7 +44,7 @@ func TestReadBulkRequestSuccess(t *testing.T) {
 
 		var timestamps []int64
 		var result string
-		processLogMessage := func(timestamp int64, fields []logstorage.Field) {
+		processLogMessage := func(timestamp int64, fields []logstorage.Field) error {
 			timestamps = append(timestamps, timestamp)
 
 			a := make([]string, len(fields))
@@ -52,6 +53,7 @@ func TestReadBulkRequestSuccess(t *testing.T) {
 			}
 			s := "{" + strings.Join(a, ",") + "}\n"
 			result += s
+			return nil
 		}
 
 		// Read the request without compression
diff --git a/app/vlinsert/elasticsearch/elasticsearch_timing_test.go b/app/vlinsert/elasticsearch/elasticsearch_timing_test.go
index 9a50fe0eb..5d8cca1b2 100644
--- a/app/vlinsert/elasticsearch/elasticsearch_timing_test.go
+++ b/app/vlinsert/elasticsearch/elasticsearch_timing_test.go
@@ -33,7 +33,7 @@ func benchmarkReadBulkRequest(b *testing.B, isGzip bool) {
 
 	timeField := "@timestamp"
 	msgField := "message"
-	processLogMessage := func(timestmap int64, fields []logstorage.Field) {}
+	processLogMessage := func(timestmap int64, fields []logstorage.Field) error { return nil }
 
 	b.ReportAllocs()
 	b.SetBytes(int64(len(data)))
diff --git a/app/vlinsert/insertutils/common_params.go b/app/vlinsert/insertutils/common_params.go
index 23f100775..1852f2233 100644
--- a/app/vlinsert/insertutils/common_params.go
+++ b/app/vlinsert/insertutils/common_params.go
@@ -72,13 +72,13 @@ func GetCommonParams(r *http.Request) (*CommonParams, error) {
 }
 
 // GetProcessLogMessageFunc returns a function, which adds parsed log messages to lr.
-func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) {
-	return func(timestamp int64, fields []logstorage.Field) {
+func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) error {
+	return func(timestamp int64, fields []logstorage.Field) error {
 		if len(fields) > *MaxFieldsPerLine {
 			rf := logstorage.RowFormatter(fields)
 			logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, rf)
 			rowsDroppedTotalTooManyFields.Inc()
-			return
+			return nil
 		}
 
 		lr.MustAdd(cp.TenantID, timestamp, fields)
@@ -87,12 +87,14 @@ func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(ti
 			lr.ResetKeepSettings()
 			logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", cp.DebugRemoteAddr, cp.DebugRequestURI, s)
 			rowsDroppedTotalDebug.Inc()
-			return
+			return nil
 		}
 		if lr.NeedFlush() {
-			vlstorage.MustAddRows(lr)
+			err := vlstorage.AddRows(lr)
 			lr.ResetKeepSettings()
+			return err
 		}
+		return nil
 	}
 }
 
diff --git a/app/vlinsert/jsonline/jsonline.go b/app/vlinsert/jsonline/jsonline.go
index bf8d4760e..863cf2047 100644
--- a/app/vlinsert/jsonline/jsonline.go
+++ b/app/vlinsert/jsonline/jsonline.go
@@ -75,8 +75,12 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
 		rowsIngestedTotal.Inc()
 	}
 
-	vlstorage.MustAddRows(lr)
+	err = vlstorage.AddRows(lr)
 	logstorage.PutLogRows(lr)
+	if err != nil {
+		httpserver.Errorf(w, r, "cannot insert rows: %s", err)
+		return true
+	}
 
 	// update jsonlineRequestDuration only for successfully parsed requests.
 	// There is no need in updating jsonlineRequestDuration for request errors,
@@ -86,7 +90,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
 	return true
 }
 
-func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field)) (bool, error) {
+func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field) error) (bool, error) {
 	var line []byte
 	for len(line) == 0 {
 		if !sc.Scan() {
@@ -113,8 +117,12 @@ func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage f
 		ts = time.Now().UnixNano()
 	}
 	p.RenameField(msgField, "_msg")
-	processLogMessage(ts, p.Fields)
+	err = processLogMessage(ts, p.Fields)
 	logjson.PutParser(p)
+	if err != nil {
+		return false, err
+	}
+
 	return true, nil
 }
 
diff --git a/app/vlinsert/jsonline/jsonline_test.go b/app/vlinsert/jsonline/jsonline_test.go
index 86a917491..f6da725c3 100644
--- a/app/vlinsert/jsonline/jsonline_test.go
+++ b/app/vlinsert/jsonline/jsonline_test.go
@@ -16,7 +16,7 @@ func TestReadBulkRequestSuccess(t *testing.T) {
 
 		var timestamps []int64
 		var result string
-		processLogMessage := func(timestamp int64, fields []logstorage.Field) {
+		processLogMessage := func(timestamp int64, fields []logstorage.Field) error {
 			timestamps = append(timestamps, timestamp)
 
 			a := make([]string, len(fields))
@@ -25,6 +25,8 @@ func TestReadBulkRequestSuccess(t *testing.T) {
 			}
 			s := "{" + strings.Join(a, ",") + "}\n"
 			result += s
+
+			return nil
 		}
 
 		// Read the request without compression
diff --git a/app/vlinsert/loki/loki_json.go b/app/vlinsert/loki/loki_json.go
index 88a75df1d..653416faf 100644
--- a/app/vlinsert/loki/loki_json.go
+++ b/app/vlinsert/loki/loki_json.go
@@ -50,10 +50,16 @@ func handleJSON(r *http.Request, w http.ResponseWriter) bool {
 	lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
 	processLogMessage := cp.GetProcessLogMessageFunc(lr)
 	n, err := parseJSONRequest(data, processLogMessage)
-	vlstorage.MustAddRows(lr)
+	if err != nil {
+		logstorage.PutLogRows(lr)
+		httpserver.Errorf(w, r, "cannot parse Loki request: %s", err)
+		return true
+	}
+
+	err = vlstorage.AddRows(lr)
 	logstorage.PutLogRows(lr)
 	if err != nil {
-		httpserver.Errorf(w, r, "cannot parse Loki request: %s", err)
+		httpserver.Errorf(w, r, "cannot insert rows: %s", err)
 		return true
 	}
 	rowsIngestedJSONTotal.Add(n)
@@ -72,7 +78,7 @@ var (
 	lokiRequestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`)
 )
 
-func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
+func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)error) (int, error) {
 	p := parserPool.Get()
 	defer parserPool.Put(p)
 	v, err := p.ParseBytes(data)
@@ -165,7 +171,10 @@ func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, field
 				Name:  "_msg",
 				Value: bytesutil.ToUnsafeString(msg),
 			})
-			processLogMessage(ts, fields)
+			err = processLogMessage(ts, fields)
+			if err != nil {
+				return rowsIngested, err
+			}
 
 		}
 		rowsIngested += len(lines)
diff --git a/app/vlinsert/loki/loki_json_test.go b/app/vlinsert/loki/loki_json_test.go
index 93cf8652a..f285dd1f7 100644
--- a/app/vlinsert/loki/loki_json_test.go
+++ b/app/vlinsert/loki/loki_json_test.go
@@ -11,8 +11,9 @@ import (
 func TestParseJSONRequestFailure(t *testing.T) {
 	f := func(s string) {
 		t.Helper()
-		n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) {
+		n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) error {
 			t.Fatalf("unexpected call to parseJSONRequest callback!")
+			return nil
 		})
 		if err == nil {
 			t.Fatalf("expecting non-nil error")
@@ -60,13 +61,14 @@ func TestParseJSONRequestSuccess(t *testing.T) {
 	f := func(s string, resultExpected string) {
 		t.Helper()
 		var lines []string
-		n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) {
+		n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) error {
 			var a []string
 			for _, f := range fields {
 				a = append(a, f.String())
 			}
 			line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " "))
 			lines = append(lines, line)
+			return nil
 		})
 		if err != nil {
 			t.Fatalf("unexpected error: %s", err)
diff --git a/app/vlinsert/loki/loki_json_timing_test.go b/app/vlinsert/loki/loki_json_timing_test.go
index 9c51f593a..37d922fc0 100644
--- a/app/vlinsert/loki/loki_json_timing_test.go
+++ b/app/vlinsert/loki/loki_json_timing_test.go
@@ -27,7 +27,7 @@ func benchmarkParseJSONRequest(b *testing.B, streams, rows, labels int) {
 	b.RunParallel(func(pb *testing.PB) {
 		data := getJSONBody(streams, rows, labels)
 		for pb.Next() {
-			_, err := parseJSONRequest(data, func(timestamp int64, fields []logstorage.Field) {})
+			_, err := parseJSONRequest(data, func(timestamp int64, fields []logstorage.Field) error { return nil })
 			if err != nil {
 				panic(fmt.Errorf("unexpected error: %s", err))
 			}
diff --git a/app/vlinsert/loki/loki_protobuf.go b/app/vlinsert/loki/loki_protobuf.go
index aa4e6b592..0e7aceac7 100644
--- a/app/vlinsert/loki/loki_protobuf.go
+++ b/app/vlinsert/loki/loki_protobuf.go
@@ -42,10 +42,16 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) bool {
 	lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
 	processLogMessage := cp.GetProcessLogMessageFunc(lr)
 	n, err := parseProtobufRequest(data, processLogMessage)
-	vlstorage.MustAddRows(lr)
+	if err != nil {
+		logstorage.PutLogRows(lr)
+		httpserver.Errorf(w, r, "cannot parse Loki request: %s", err)
+		return true
+	}
+
+	err = vlstorage.AddRows(lr)
 	logstorage.PutLogRows(lr)
 	if err != nil {
-		httpserver.Errorf(w, r, "cannot parse loki request: %s", err)
+		httpserver.Errorf(w, r, "cannot insert rows: %s", err)
 		return true
 	}
 
@@ -65,7 +71,7 @@ var (
 	lokiRequestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`)
 )
 
-func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
+ func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field) error) (int, error) {
 	bb := bytesBufPool.Get()
 	defer bytesBufPool.Put(bb)
 
@@ -108,7 +114,10 @@ func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, f
 			if ts == 0 {
 				ts = currentTimestamp
 			}
-			processLogMessage(ts, fields)
+			err = processLogMessage(ts, fields)
+			if err != nil {
+				return rowsIngested, err
+			}
 		}
 		rowsIngested += len(stream.Entries)
 	}
diff --git a/app/vlinsert/loki/loki_protobuf_test.go b/app/vlinsert/loki/loki_protobuf_test.go
index f6eb5f0ec..cc259bce5 100644
--- a/app/vlinsert/loki/loki_protobuf_test.go
+++ b/app/vlinsert/loki/loki_protobuf_test.go
@@ -14,7 +14,7 @@ func TestParseProtobufRequestSuccess(t *testing.T) {
 	f := func(s string, resultExpected string) {
 		t.Helper()
 		var pr PushRequest
-		n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) {
+		n, err := parseJSONRequest([]byte(s), func(timestamp int64, fields []logstorage.Field) error {
 			msg := ""
 			for _, f := range fields {
 				if f.Name == "_msg" {
@@ -39,6 +39,7 @@ func TestParseProtobufRequestSuccess(t *testing.T) {
 					},
 				},
 			})
+			return nil
 		})
 		if err != nil {
 			t.Fatalf("unexpected error: %s", err)
@@ -54,13 +55,14 @@ func TestParseProtobufRequestSuccess(t *testing.T) {
 		encodedData := snappy.Encode(nil, data)
 
 		var lines []string
-		n, err = parseProtobufRequest(encodedData, func(timestamp int64, fields []logstorage.Field) {
+		n, err = parseProtobufRequest(encodedData, func(timestamp int64, fields []logstorage.Field) error {
 			var a []string
 			for _, f := range fields {
 				a = append(a, f.String())
 			}
 			line := fmt.Sprintf("_time:%d %s", timestamp, strings.Join(a, " "))
 			lines = append(lines, line)
+			return nil
 		})
 		if err != nil {
 			t.Fatalf("unexpected error: %s", err)
diff --git a/app/vlinsert/loki/loki_protobuf_timing_test.go b/app/vlinsert/loki/loki_protobuf_timing_test.go
index 18f5b89ef..230ab7a47 100644
--- a/app/vlinsert/loki/loki_protobuf_timing_test.go
+++ b/app/vlinsert/loki/loki_protobuf_timing_test.go
@@ -6,8 +6,9 @@ import (
 	"testing"
 	"time"
 
-	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
 	"github.com/golang/snappy"
+
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
 )
 
 func BenchmarkParseProtobufRequest(b *testing.B) {
@@ -28,7 +29,7 @@ func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) {
 	b.RunParallel(func(pb *testing.PB) {
 		body := getProtobufBody(streams, rows, labels)
 		for pb.Next() {
-			_, err := parseProtobufRequest(body, func(timestamp int64, fields []logstorage.Field) {})
+			_, err := parseProtobufRequest(body, func(timestamp int64, fields []logstorage.Field) error { return nil })
 			if err != nil {
 				panic(fmt.Errorf("unexpected error: %s", err))
 			}
diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go
index 0a6c9b55a..4533e7b25 100644
--- a/app/vlstorage/main.go
+++ b/app/vlstorage/main.go
@@ -6,11 +6,12 @@ import (
 	"sync"
 	"time"
 
+	"github.com/VictoriaMetrics/metrics"
+
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
-	"github.com/VictoriaMetrics/metrics"
 )
 
 var (
@@ -29,6 +30,7 @@ var (
 		"see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields ; see also -logIngestedRows")
 	logIngestedRows = flag.Bool("logIngestedRows", false, "Whether to log all the ingested log entries; this can be useful for debugging of data ingestion; "+
 		"see https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/ ; see also -logNewStreams")
+	minFreeDiskSpaceBytes = flagutil.NewBytes("storage.minFreeDiskSpaceBytes", 10e6, "The minimum free disk space at -storageDataPath after which the storage stops accepting new data")
 )
 
 // Init initializes vlstorage.
@@ -43,11 +45,12 @@ func Init() {
 		logger.Fatalf("-retentionPeriod cannot be smaller than a day; got %s", retentionPeriod)
 	}
 	cfg := &logstorage.StorageConfig{
-		Retention:       retentionPeriod.Duration(),
-		FlushInterval:   *inmemoryDataFlushInterval,
-		FutureRetention: futureRetention.Duration(),
-		LogNewStreams:   *logNewStreams,
-		LogIngestedRows: *logIngestedRows,
+		Retention:             retentionPeriod.Duration(),
+		FlushInterval:         *inmemoryDataFlushInterval,
+		FutureRetention:       futureRetention.Duration(),
+		LogNewStreams:         *logNewStreams,
+		LogIngestedRows:       *logIngestedRows,
+		MinFreeDiskSpaceBytes: minFreeDiskSpaceBytes.N,
 	}
 	logger.Infof("opening storage at -storageDataPath=%s", *storageDataPath)
 	startTime := time.Now()
@@ -74,9 +77,9 @@ func Stop() {
 var strg *logstorage.Storage
 var storageMetrics *metrics.Set
 
-// MustAddRows adds lr to vlstorage
-func MustAddRows(lr *logstorage.LogRows) {
-	strg.MustAddRows(lr)
+// AddRows adds lr to vlstorage
+func AddRows(lr *logstorage.LogRows) error {
+	return strg.AddRows(lr)
 }
 
 // RunQuery runs the given q and calls processBlock for the returned data blocks
@@ -107,6 +110,13 @@ func initStorageMetrics(strg *logstorage.Storage) *metrics.Set {
 	ms.NewGauge(fmt.Sprintf(`vl_free_disk_space_bytes{path=%q}`, *storageDataPath), func() float64 {
 		return float64(fs.MustGetFreeSpace(*storageDataPath))
 	})
+	ms.NewGauge(fmt.Sprintf(`vl_storage_is_read_only{path=%q}`, *storageDataPath), func() float64 {
+		if m().IsReadOnly {
+			return 1
+		}
+
+		return 0
+	})
 
 	ms.NewGauge(`vl_active_merges{type="inmemory"}`, func() float64 {
 		return float64(m().InmemoryActiveMerges)
diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md
index f2bb6d29a..8f40aec7d 100644
--- a/docs/VictoriaLogs/CHANGELOG.md
+++ b/docs/VictoriaLogs/CHANGELOG.md
@@ -10,6 +10,8 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
   * `vl_data_size_bytes{type="storage"}` - on-disk size for data excluding [log stream](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) indexes.
   * `vl_data_size_bytes{type="indexdb"}` - on-disk size for [log stream](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) indexes.
 * FEATURE: add `-insert.maxFieldsPerLine` command-line flag, which can be used for limiting the number of fields per line in logs sent to VictoriaLogs via ingestion protocols. This helps to avoid issues like [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762).
+* FEATURE: expose `vl_http_request_duration_seconds` metric at the [/metrics](monitoring). See this [PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4934) for details.
+* FEATURE: add support of `--storage.minFreeDiskSpaceBytes` command-line flag to allow switching to read-only mode when running out of disk space at `--storageDataPath`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4737).
 * FEATURE: expose `vl_http_request_duration_seconds` histogram at the [/metrics](https://docs.victoriametrics.com/VictoriaLogs/#monitoring) page. Thanks to @crossoverJie for [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4934).
 
 * BUGFIX: fix possible panic when no data is written to VictoriaLogs for a long time. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4895). Thanks to @crossoverJie for filing and fixing the issue.
diff --git a/docs/VictoriaLogs/README.md b/docs/VictoriaLogs/README.md
index 22c722cc1..ba8e2c598 100644
--- a/docs/VictoriaLogs/README.md
+++ b/docs/VictoriaLogs/README.md
@@ -239,6 +239,9 @@ Pass `-help` to VictoriaLogs in order to see the list of supported command-line
     	Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1048576)
   -storageDataPath string
     	Path to directory with the VictoriaLogs data; see https://docs.victoriametrics.com/VictoriaLogs/#storage (default "victoria-logs-data")
+  -storage.minFreeDiskSpaceBytes size
+    	The minimum free disk space at -storageDataPath after which the storage stops accepting new data
+    	Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 10000000)
   -tls
     	Whether to enable TLS for incoming HTTP requests at -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set
   -tlsCertFile string
diff --git a/lib/logstorage/datadb.go b/lib/logstorage/datadb.go
index d3afc82f1..d2fd8d5c5 100644
--- a/lib/logstorage/datadb.go
+++ b/lib/logstorage/datadb.go
@@ -2,6 +2,7 @@ package logstorage
 
 import (
 	"encoding/json"
+	"errors"
 	"fmt"
 	"os"
 	"path/filepath"
@@ -78,6 +79,9 @@ type datadb struct {
 	//
 	// This variable must be accessed under partsLock.
 	mergeWorkersCount int
+
+	// isReadOnly indicates whether the storage is in read-only mode.
+	isReadOnly *uint32
 }
 
 // partWrapper is a wrapper for opened part.
@@ -137,7 +141,7 @@ func mustCreateDatadb(path string) {
 }
 
 // mustOpenDatadb opens datadb at the given path with the given flushInterval for in-memory data.
-func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *datadb {
+func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration, isReadOnly *uint32) *datadb {
 	// Remove temporary directories, which may be left after unclean shutdown.
 	fs.MustRemoveTemporaryDirs(path)
 
@@ -169,6 +173,7 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *da
 		path:          path,
 		fileParts:     pws,
 		stopCh:        make(chan struct{}),
+		isReadOnly:    isReadOnly,
 	}
 
 	// Start merge workers in the hope they'll merge the remaining parts
@@ -221,7 +226,10 @@ func (ddb *datadb) flushInmemoryParts() {
 			// There are no in-memory parts, so stop the flusher.
 			return
 		}
-		ddb.mustMergePartsFinal(partsToFlush)
+		err := ddb.mergePartsFinal(partsToFlush)
+		if err != nil {
+			logger.Errorf("cannot flush inmemory parts to disk: %s", err)
+		}
 
 		select {
 		case <-ddb.stopCh:
@@ -235,6 +243,9 @@ func (ddb *datadb) flushInmemoryParts() {
 //
 // This function must be called under locked partsLock.
 func (ddb *datadb) startMergeWorkerLocked() {
+	if ddb.IsReadOnly() {
+		return
+	}
 	if ddb.mergeWorkersCount >= getMergeWorkersCount() {
 		return
 	}
@@ -242,8 +253,11 @@ func (ddb *datadb) startMergeWorkerLocked() {
 	ddb.wg.Add(1)
 	go func() {
 		globalMergeLimitCh <- struct{}{}
-		ddb.mustMergeExistingParts()
+		err := ddb.mergeExistingParts()
 		<-globalMergeLimitCh
+		if err != nil && !errors.Is(err, errReadOnly) {
+			logger.Errorf("cannot merge parts: %s", err)
+		}
 		ddb.wg.Done()
 	}()
 }
@@ -263,7 +277,7 @@ func getMergeWorkersCount() int {
 	return n
 }
 
-func (ddb *datadb) mustMergeExistingParts() {
+func (ddb *datadb) mergeExistingParts() error {
 	for !needStop(ddb.stopCh) {
 		maxOutBytes := ddb.availableDiskSpace()
 
@@ -280,7 +294,7 @@ func (ddb *datadb) mustMergeExistingParts() {
 
 		if len(pws) == 0 {
 			// Nothing to merge at the moment.
-			return
+			return nil
 		}
 
 		partsSize := getCompressedSize(pws)
@@ -291,9 +305,15 @@ func (ddb *datadb) mustMergeExistingParts() {
 			ddb.releasePartsToMerge(pws)
 			continue
 		}
-		ddb.mustMergeParts(pws, false)
+		err := ddb.mergeParts(pws, false)
 		ddb.releaseDiskSpace(partsSize)
+		ddb.releasePartsToMerge(pws)
+		if err != nil {
+			return err
+		}
 	}
+
+	return nil
 }
 
 // appendNotInMergePartsLocked appends src parts with isInMerge=false to dst and returns the result.
@@ -328,15 +348,21 @@ func assertIsInMerge(pws []*partWrapper) {
 	}
 }
 
-// mustMergeParts merges pws to a single resulting part.
+var errReadOnly = errors.New("the storage is in read-only mode")
+
+// mergeParts merges pws to a single resulting part.
 //
 // if isFinal is set, then the resulting part will be saved to disk.
 //
 // All the parts inside pws must have isInMerge field set to true.
-func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
+func (ddb *datadb) mergeParts(pws []*partWrapper, isFinal bool) error {
+	if ddb.IsReadOnly() {
+		return errReadOnly
+	}
+
 	if len(pws) == 0 {
 		// Nothing to merge.
-		return
+		return nil
 	}
 	assertIsInMerge(pws)
 
@@ -363,7 +389,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
 		mp.MustStoreToDisk(dstPartPath)
 		pwNew := ddb.openCreatedPart(&mp.ph, pws, nil, dstPartPath)
 		ddb.swapSrcWithDstParts(pws, pwNew, dstPartType)
-		return
+		return nil
 	}
 
 	// Prepare blockStreamReaders for source parts.
@@ -415,7 +441,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
 		if dstPartType == partFile {
 			fs.MustRemoveAll(dstPartPath)
 		}
-		return
+		return nil
 	}
 
 	// Atomically swap the source parts with the newly created part.
@@ -435,7 +461,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
 
 	d := time.Since(startTime)
 	if d <= 30*time.Second {
-		return
+		return nil
 	}
 
 	// Log stats for long merges.
@@ -443,6 +469,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
 	rowsPerSec := int(float64(srcRowsCount) / durationSecs)
 	logger.Infof("merged (%d parts, %d rows, %d blocks, %d bytes) into (1 part, %d rows, %d blocks, %d bytes) in %.3f seconds at %d rows/sec to %q",
 		len(pws), srcRowsCount, srcBlocksCount, srcSize, dstRowsCount, dstBlocksCount, dstSize, durationSecs, rowsPerSec, dstPartPath)
+	return nil
 }
 
 func (ddb *datadb) nextMergeIdx() uint64 {
@@ -610,7 +637,7 @@ func (ddb *datadb) debugFlush() {
 	// Nothing to do, since all the ingested data is available for search via ddb.inmemoryParts.
 }
 
-func (ddb *datadb) mustMergePartsFinal(pws []*partWrapper) {
+func (ddb *datadb) mergePartsFinal(pws []*partWrapper) error {
 	assertIsInMerge(pws)
 
 	var pwsChunk []*partWrapper
@@ -619,7 +646,11 @@ func (ddb *datadb) mustMergePartsFinal(pws []*partWrapper) {
 		if len(pwsChunk) == 0 {
 			pwsChunk = append(pwsChunk[:0], pws...)
 		}
-		ddb.mustMergeParts(pwsChunk, true)
+		err := ddb.mergeParts(pwsChunk, true)
+		if err != nil {
+			ddb.releasePartsToMerge(pwsChunk)
+			return err
+		}
 
 		partsToRemove := partsToMap(pwsChunk)
 		removedParts := 0
@@ -628,6 +659,7 @@ func (ddb *datadb) mustMergePartsFinal(pws []*partWrapper) {
 			logger.Panicf("BUG: unexpected number of parts removed; got %d; want %d", removedParts, len(pwsChunk))
 		}
 	}
+	return nil
 }
 
 func partsToMap(pws []*partWrapper) map[*partWrapper]struct{} {
@@ -793,6 +825,10 @@ func (ddb *datadb) releaseDiskSpace(n uint64) {
 	atomic.AddUint64(&reservedDiskSpace, -n)
 }
 
+func (ddb *datadb) IsReadOnly() bool {
+	return atomic.LoadUint32(ddb.isReadOnly) == 1
+}
+
 // reservedDiskSpace tracks global reserved disk space for currently executed
 // background merges across all the partitions.
 //
@@ -817,7 +853,10 @@ func mustCloseDatadb(ddb *datadb) {
 	// flush in-memory data to disk
 	pws := append([]*partWrapper{}, ddb.inmemoryParts...)
 	setInMergeLocked(pws)
-	ddb.mustMergePartsFinal(pws)
+	err := ddb.mergePartsFinal(pws)
+	if err != nil {
+		logger.Fatalf("FATAL: cannot merge inmemory parts: %s", err)
+	}
 
 	// There is no need in using ddb.partsLock here, since nobody should acces ddb now.
 	for _, pw := range ddb.inmemoryParts {
diff --git a/lib/logstorage/filters_test.go b/lib/logstorage/filters_test.go
index cf7d6e782..289492045 100644
--- a/lib/logstorage/filters_test.go
+++ b/lib/logstorage/filters_test.go
@@ -9277,7 +9277,7 @@ func generateRowsFromColumns(s *Storage, tenantID TenantID, columns []column) {
 		timestamp := int64(i) * 1e9
 		lr.MustAdd(tenantID, timestamp, fields)
 	}
-	s.MustAddRows(lr)
+	_ = s.AddRows(lr)
 	PutLogRows(lr)
 }
 
@@ -9291,6 +9291,6 @@ func generateRowsFromTimestamps(s *Storage, tenantID TenantID, timestamps []int6
 		})
 		lr.MustAdd(tenantID, timestamp, fields)
 	}
-	s.MustAddRows(lr)
+	_ = s.AddRows(lr)
 	PutLogRows(lr)
 }
diff --git a/lib/logstorage/log_rows.go b/lib/logstorage/log_rows.go
index ce759b85a..789dab6a3 100644
--- a/lib/logstorage/log_rows.go
+++ b/lib/logstorage/log_rows.go
@@ -7,7 +7,7 @@ import (
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
 )
 
-// LogRows holds a set of rows needed for Storage.MustAddRows
+// LogRows holds a set of rows needed for Storage.AddRows
 //
 // LogRows must be obtained via GetLogRows()
 type LogRows struct {
diff --git a/lib/logstorage/partition.go b/lib/logstorage/partition.go
index 64465de20..df609c3aa 100644
--- a/lib/logstorage/partition.go
+++ b/lib/logstorage/partition.go
@@ -77,7 +77,7 @@ func mustOpenPartition(s *Storage, path string) *partition {
 
 	// Open datadb
 	datadbPath := filepath.Join(path, datadbDirname)
-	pt.ddb = mustOpenDatadb(pt, datadbPath, s.flushInterval)
+	pt.ddb = mustOpenDatadb(pt, datadbPath, s.flushInterval, &s.isReadOnly)
 
 	return pt
 }
diff --git a/lib/logstorage/storage.go b/lib/logstorage/storage.go
index 9d840fb5b..341ec46c1 100644
--- a/lib/logstorage/storage.go
+++ b/lib/logstorage/storage.go
@@ -26,6 +26,9 @@ type StorageStats struct {
 	PartitionsCount uint64
 
 	PartitionStats
+
+	// IsReadOnly indicates whether the storage is read-only.
+	IsReadOnly bool
 }
 
 // Reset resets s.
@@ -58,6 +61,9 @@ type StorageConfig struct {
 	//
 	// This can be useful for debugging of data ingestion.
 	LogIngestedRows bool
+
+	// MinFreeDiskSpaceBytes is the minimum free disk space at -storageDataPath after which the storage stops accepting new data
+	MinFreeDiskSpaceBytes int64
 }
 
 // Storage is the storage for log entries.
@@ -126,6 +132,10 @@ type Storage struct {
 	//
 	// It reduces the load on persistent storage during querying by _stream:{...} filter.
 	streamFilterCache *workingsetcache.Cache
+
+	isReadOnly uint32
+
+	freeDiskSpaceWatcherWG sync.WaitGroup
 }
 
 type partitionWrapper struct {
@@ -288,6 +298,7 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
 
 	s.partitions = ptws
 	s.runRetentionWatcher()
+	s.startFreeDiskSpaceWatcher(uint64(cfg.MinFreeDiskSpaceBytes))
 	return s
 }
 
@@ -357,6 +368,7 @@ func (s *Storage) MustClose() {
 	// Stop background workers
 	close(s.stopCh)
 	s.wg.Wait()
+	s.freeDiskSpaceWatcherWG.Wait()
 
 	// Close partitions
 	for _, pw := range s.partitions {
@@ -389,8 +401,12 @@ func (s *Storage) MustClose() {
 	s.path = ""
 }
 
-// MustAddRows adds lr to s.
-func (s *Storage) MustAddRows(lr *LogRows) {
+// AddRows adds lr to s.
+func (s *Storage) AddRows(lr *LogRows) error {
+	if s.IsReadOnly() {
+		return errReadOnly
+	}
+
 	// Fast path - try adding all the rows to the hot partition
 	s.partitionsLock.Lock()
 	ptwHot := s.ptwHot
@@ -403,7 +419,7 @@ func (s *Storage) MustAddRows(lr *LogRows) {
 		if ptwHot.canAddAllRows(lr) {
 			ptwHot.pt.mustAddRows(lr)
 			ptwHot.decRef()
-			return
+			return nil
 		}
 		ptwHot.decRef()
 	}
@@ -447,6 +463,7 @@ func (s *Storage) MustAddRows(lr *LogRows) {
 		ptw.decRef()
 		PutLogRows(lrPart)
 	}
+	return nil
 }
 
 var tooSmallTimestampLogger = logger.WithThrottler("too_small_timestamp", 5*time.Second)
@@ -515,6 +532,44 @@ func (s *Storage) UpdateStats(ss *StorageStats) {
 		ptw.pt.updateStats(&ss.PartitionStats)
 	}
 	s.partitionsLock.Unlock()
+	ss.IsReadOnly = s.IsReadOnly()
+}
+
+// IsReadOnly returns information is storage in read only mode
+func (s *Storage) IsReadOnly() bool {
+	return atomic.LoadUint32(&s.isReadOnly) == 1
+}
+
+func (s *Storage) startFreeDiskSpaceWatcher(freeDiskSpaceLimitBytes uint64) {
+	f := func() {
+		freeSpaceBytes := fs.MustGetFreeSpace(s.path)
+		if freeSpaceBytes < freeDiskSpaceLimitBytes {
+			// Switch the storage to readonly mode if there is no enough free space left at s.path
+			logger.Warnf("switching the storage at %s to read-only mode, since it has less than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left",
+				s.path, freeDiskSpaceLimitBytes, freeSpaceBytes)
+			atomic.StoreUint32(&s.isReadOnly, 1)
+			return
+		}
+		if atomic.CompareAndSwapUint32(&s.isReadOnly, 1, 0) {
+			logger.Warnf("enabling writing to the storage at %s, since it has more than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left",
+				s.path, freeDiskSpaceLimitBytes, freeSpaceBytes)
+		}
+	}
+	f()
+	s.freeDiskSpaceWatcherWG.Add(1)
+	go func() {
+		defer s.freeDiskSpaceWatcherWG.Done()
+		ticker := time.NewTicker(time.Second)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-s.stopCh:
+				return
+			case <-ticker.C:
+				f()
+			}
+		}
+	}()
 }
 
 func (s *Storage) debugFlush() {
diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go
index 63404838c..d61035a5d 100644
--- a/lib/logstorage/storage_search_test.go
+++ b/lib/logstorage/storage_search_test.go
@@ -70,7 +70,7 @@ func TestStorageRunQuery(t *testing.T) {
 					})
 					lr.MustAdd(tenantID, timestamp, fields)
 				}
-				s.MustAddRows(lr)
+				_ = s.AddRows(lr)
 				PutLogRows(lr)
 			}
 		}
@@ -366,7 +366,7 @@ func TestStorageSearch(t *testing.T) {
 					})
 					lr.MustAdd(tenantID, timestamp, fields)
 				}
-				s.MustAddRows(lr)
+				_ = s.AddRows(lr)
 				PutLogRows(lr)
 			}
 		}
diff --git a/lib/logstorage/storage_test.go b/lib/logstorage/storage_test.go
index 193179bb1..9951a6a4c 100644
--- a/lib/logstorage/storage_test.go
+++ b/lib/logstorage/storage_test.go
@@ -32,7 +32,7 @@ func TestStorageMustAddRows(t *testing.T) {
 		lr := newTestLogRows(1, 1, 0)
 		lr.timestamps[0] = time.Now().UTC().UnixNano()
 		totalRowsCount += uint64(len(lr.timestamps))
-		s.MustAddRows(lr)
+		_ = s.AddRows(lr)
 		sStats.Reset()
 		s.UpdateStats(&sStats)
 		if n := sStats.RowsCount(); n != totalRowsCount {
@@ -56,7 +56,7 @@ func TestStorageMustAddRows(t *testing.T) {
 		lr.timestamps[i] = time.Now().UTC().UnixNano()
 	}
 	totalRowsCount += uint64(len(lr.timestamps))
-	s.MustAddRows(lr)
+	_ = s.AddRows(lr)
 	sStats.Reset()
 	s.UpdateStats(&sStats)
 	if n := sStats.RowsCount(); n != totalRowsCount {
@@ -80,7 +80,7 @@ func TestStorageMustAddRows(t *testing.T) {
 		now += nsecPerDay
 	}
 	totalRowsCount += uint64(len(lr.timestamps))
-	s.MustAddRows(lr)
+	_ = s.AddRows(lr)
 	sStats.Reset()
 	s.UpdateStats(&sStats)
 	if n := sStats.RowsCount(); n != totalRowsCount {