VictoriaMetrics/app/vlinsert/jsonline/jsonline_test.go
Aliaksandr Valialkin 7b33a27874
lib/logstorage: follow-up for 8a23d08c21
- Compare the actual free disk space to the value provided via -storage.minFreeDiskSpaceBytes
  directly inside the Storage.IsReadOnly(). This should work fast in most cases.
  This simplifies the logic at lib/storage.

- Do not take into account -storage.minFreeDiskSpaceBytes during background merges, since
  it results in uncontrolled growth of small parts when the free disk space approaches -storage.minFreeDiskSpaceBytes.
  The background merge logic uses another mechanism for determining whether there is enough
  disk space for the merge - it reserves the needed disk space before the merge
  and releases it after the merge. This prevents from out of disk space errors during background merge.

- Properly handle corner cases for flushing in-memory data to disk when the storage
  enters read-only mode. This is better than losing the in-memory data.

- Return back Storage.MustAddRows() instead of Storage.AddRows(),
  since the only case when AddRows() can return error is when the storage is in read-only mode.
  This case must be handled by the caller by calling Storage.IsReadOnly()
  before adding rows to the storage.
  This simplifies the code a bit, since the caller of Storage.MustAddRows() shouldn't handle
  errors returned by Storage.AddRows().

- Properly store parsed logs to Storage if parts of the request contain invalid log lines.
  Previously the parsed logs could be lost in this case.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4737
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4945
2023-10-02 16:52:23 +02:00

70 lines
2 KiB
Go

package jsonline
import (
"bufio"
"bytes"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"reflect"
"strings"
"testing"
)
func TestReadBulkRequestSuccess(t *testing.T) {
f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) {
t.Helper()
var timestamps []int64
var result string
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
timestamps = append(timestamps, timestamp)
a := make([]string, len(fields))
for i, f := range fields {
a[i] = fmt.Sprintf("%q:%q", f.Name, f.Value)
}
s := "{" + strings.Join(a, ",") + "}\n"
result += s
}
// Read the request without compression
r := bytes.NewBufferString(data)
sc := bufio.NewScanner(r)
rows := 0
for {
ok, err := readLine(sc, timeField, msgField, processLogMessage)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !ok {
break
}
rows++
}
if rows != rowsExpected {
t.Fatalf("unexpected rows read; got %d; want %d", rows, rowsExpected)
}
if !reflect.DeepEqual(timestamps, timestampsExpected) {
t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", timestamps, timestampsExpected)
}
if result != resultExpected {
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, resultExpected)
}
}
// Verify non-empty data
data := `{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
{"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"}
{"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"}
`
timeField := "@timestamp"
msgField := "message"
rowsExpected := 3
timestampsExpected := []int64{1686026891735000000, 1686026892735000000, 1686026893735000000}
resultExpected := `{"@timestamp":"","log.offset":"71770","log.file.path":"/var/log/auth.log","_msg":"foobar"}
{"@timestamp":"","_msg":"baz"}
{"_msg":"xyz","@timestamp":"","x":"y"}
`
f(data, timeField, msgField, rowsExpected, timestampsExpected, resultExpected)
}