mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
7b33a27874
- Compare the actual free disk space to the value provided via -storage.minFreeDiskSpaceBytes directly inside the Storage.IsReadOnly(). This should work fast in most cases. This simplifies the logic at lib/storage. - Do not take into account -storage.minFreeDiskSpaceBytes during background merges, since it results in uncontrolled growth of small parts when the free disk space approaches -storage.minFreeDiskSpaceBytes. The background merge logic uses another mechanism for determining whether there is enough disk space for the merge - it reserves the needed disk space before the merge and releases it after the merge. This prevents from out of disk space errors during background merge. - Properly handle corner cases for flushing in-memory data to disk when the storage enters read-only mode. This is better than losing the in-memory data. - Return back Storage.MustAddRows() instead of Storage.AddRows(), since the only case when AddRows() can return error is when the storage is in read-only mode. This case must be handled by the caller by calling Storage.IsReadOnly() before adding rows to the storage. This simplifies the code a bit, since the caller of Storage.MustAddRows() shouldn't handle errors returned by Storage.AddRows(). - Properly store parsed logs to Storage if parts of the request contain invalid log lines. Previously the parsed logs could be lost in this case. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4737 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4945
70 lines
2 KiB
Go
70 lines
2 KiB
Go
package jsonline
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"fmt"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
|
"reflect"
|
|
"strings"
|
|
"testing"
|
|
)
|
|
|
|
func TestReadBulkRequestSuccess(t *testing.T) {
|
|
f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) {
|
|
t.Helper()
|
|
|
|
var timestamps []int64
|
|
var result string
|
|
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
|
|
timestamps = append(timestamps, timestamp)
|
|
|
|
a := make([]string, len(fields))
|
|
for i, f := range fields {
|
|
a[i] = fmt.Sprintf("%q:%q", f.Name, f.Value)
|
|
}
|
|
s := "{" + strings.Join(a, ",") + "}\n"
|
|
result += s
|
|
}
|
|
|
|
// Read the request without compression
|
|
r := bytes.NewBufferString(data)
|
|
sc := bufio.NewScanner(r)
|
|
rows := 0
|
|
for {
|
|
ok, err := readLine(sc, timeField, msgField, processLogMessage)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
if !ok {
|
|
break
|
|
}
|
|
rows++
|
|
}
|
|
if rows != rowsExpected {
|
|
t.Fatalf("unexpected rows read; got %d; want %d", rows, rowsExpected)
|
|
}
|
|
|
|
if !reflect.DeepEqual(timestamps, timestampsExpected) {
|
|
t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", timestamps, timestampsExpected)
|
|
}
|
|
if result != resultExpected {
|
|
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected)
|
|
}
|
|
}
|
|
|
|
// Verify non-empty data
|
|
data := `{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
|
|
{"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"}
|
|
{"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"}
|
|
`
|
|
timeField := "@timestamp"
|
|
msgField := "message"
|
|
rowsExpected := 3
|
|
timestampsExpected := []int64{1686026891735000000, 1686026892735000000, 1686026893735000000}
|
|
resultExpected := `{"@timestamp":"","log.offset":"71770","log.file.path":"/var/log/auth.log","_msg":"foobar"}
|
|
{"@timestamp":"","_msg":"baz"}
|
|
{"_msg":"xyz","@timestamp":"","x":"y"}
|
|
`
|
|
f(data, timeField, msgField, rowsExpected, timestampsExpected, resultExpected)
|
|
}
|