app/vlinsert/elasticsearch: optimize parsing command line

Just search for "create" or "index" substrings there instead of spending CPU time on its parsing
This commit is contained in:
Aliaksandr Valialkin 2023-06-20 21:16:11 -07:00
parent ca41a164cc
commit 7f146543ef
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 38 additions and 15 deletions

View file

@ -208,7 +208,7 @@ var rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="elastic
func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
processLogMessage func(timestamp int64, fields []logstorage.Field),
) (bool, error) {
// Decode command, must be "create" or "index"
// Read the command, must be "create" or "index"
if !sc.Scan() {
if err := sc.Err(); err != nil {
if errors.Is(err, bufio.ErrTooLong) {
@ -219,15 +219,10 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
return false, nil
}
line := sc.Bytes()
p := parserPool.Get()
v, err := p.ParseBytes(line)
if err != nil {
return false, fmt.Errorf(`cannot parse "create" or "index" command: %w`, err)
lineStr := bytesutil.ToUnsafeString(line)
if !strings.Contains(lineStr, `"create"`) && !strings.Contains(lineStr, `"index"`) {
return false, fmt.Errorf(`unexpected command %q; expecting "create" or "index"`, line)
}
if v.GetObject("create") == nil && v.GetObject("index") == nil {
return false, fmt.Errorf(`unexpected command %q; expected "create" or "index"`, v)
}
parserPool.Put(p)
// Decode log message
if !sc.Scan() {
@ -242,14 +237,12 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
line = sc.Bytes()
pctx := getParserCtx()
if err := pctx.parseLogMessage(line); err != nil {
invalidJSONLineLogger.Warnf("cannot parse json-encoded log entry: %s", err)
return true, nil
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
}
timestamp, err := extractTimestampFromFields(timeField, pctx.fields)
if err != nil {
invalidTimestampLogger.Warnf("skipping the log entry because cannot parse timestamp: %s", err)
return true, nil
return false, fmt.Errorf("cannot parse timestamp: %w", err)
}
updateMessageFieldName(msgField, pctx.fields)
processLogMessage(timestamp, pctx.fields)

View file

@ -11,7 +11,33 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
func TestReadBulkRequest(t *testing.T) {
func TestReadBulkRequestFailure(t *testing.T) {
f := func(data string) {
t.Helper()
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
t.Fatalf("unexpected call to processLogMessage with timestamp=%d, fields=%s", timestamp, fields)
}
r := bytes.NewBufferString(data)
rows, err := readBulkRequest(r, false, "_time", "_msg", processLogMessage)
if err == nil {
t.Fatalf("expecting non-empty error")
}
if rows != 0 {
t.Fatalf("unexpected non-zero rows=%d", rows)
}
}
f("foobar")
f(`{}`)
f(`{"create":{}}`)
f(`{"creat":{}}
{}`)
f(`{"create":{}}
foobar`)
}
func TestReadBulkRequestSuccess(t *testing.T) {
f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) {
t.Helper()
@ -66,11 +92,15 @@ func TestReadBulkRequest(t *testing.T) {
}
}
// Verify an empty data
f("", "_time", "_msg", 0, nil, "")
// Verify non-empty data
data := `{"create":{"_index":"filebeat-8.8.0"}}
{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
{"create":{"_index":"filebeat-8.8.0"}}
{"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"}
{"create":{"_index":"filebeat-8.8.0"}}
{"index":{"_index":"filebeat-8.8.0"}}
{"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"}
`
timeField := "@timestamp"