From 07bd118b1a19aec93f9d50e81c3aad299756b249 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 20 Jun 2023 21:16:11 -0700 Subject: [PATCH] app/vlinsert/elasticsearch: optimize parsing command line Just search for "create" or "index" substrings there instead of spending CPU time on its parsing --- app/vlinsert/elasticsearch/elasticsearch.go | 19 ++++------- .../elasticsearch/elasticsearch_test.go | 34 +++++++++++++++++-- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index 8a9d6d977..a387fc008 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -208,7 +208,7 @@ var rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="elastic func readBulkLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field), ) (bool, error) { - // Decode command, must be "create" or "index" + // Read the command, must be "create" or "index" if !sc.Scan() { if err := sc.Err(); err != nil { if errors.Is(err, bufio.ErrTooLong) { @@ -219,15 +219,10 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, return false, nil } line := sc.Bytes() - p := parserPool.Get() - v, err := p.ParseBytes(line) - if err != nil { - return false, fmt.Errorf(`cannot parse "create" or "index" command: %w`, err) + lineStr := bytesutil.ToUnsafeString(line) + if !strings.Contains(lineStr, `"create"`) && !strings.Contains(lineStr, `"index"`) { + return false, fmt.Errorf(`unexpected command %q; expecting "create" or "index"`, line) } - if v.GetObject("create") == nil && v.GetObject("index") == nil { - return false, fmt.Errorf(`unexpected command %q; expected "create" or "index"`, v) - } - parserPool.Put(p) // Decode log message if !sc.Scan() { @@ -242,14 +237,12 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, line = sc.Bytes() pctx := getParserCtx() if err := pctx.parseLogMessage(line); err != nil { - invalidJSONLineLogger.Warnf("cannot parse json-encoded log entry: %s", err) - return true, nil + return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) } timestamp, err := extractTimestampFromFields(timeField, pctx.fields) if err != nil { - invalidTimestampLogger.Warnf("skipping the log entry because cannot parse timestamp: %s", err) - return true, nil + return false, fmt.Errorf("cannot parse timestamp: %w", err) } updateMessageFieldName(msgField, pctx.fields) processLogMessage(timestamp, pctx.fields) diff --git a/app/vlinsert/elasticsearch/elasticsearch_test.go b/app/vlinsert/elasticsearch/elasticsearch_test.go index ff9a5a110..859e882f2 100644 --- a/app/vlinsert/elasticsearch/elasticsearch_test.go +++ b/app/vlinsert/elasticsearch/elasticsearch_test.go @@ -11,7 +11,33 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" ) -func TestReadBulkRequest(t *testing.T) { +func TestReadBulkRequestFailure(t *testing.T) { + f := func(data string) { + t.Helper() + + processLogMessage := func(timestamp int64, fields []logstorage.Field) { + t.Fatalf("unexpected call to processLogMessage with timestamp=%d, fields=%s", timestamp, fields) + } + + r := bytes.NewBufferString(data) + rows, err := readBulkRequest(r, false, "_time", "_msg", processLogMessage) + if err == nil { + t.Fatalf("expecting non-empty error") + } + if rows != 0 { + t.Fatalf("unexpected non-zero rows=%d", rows) + } + } + f("foobar") + f(`{}`) + f(`{"create":{}}`) + f(`{"creat":{}} +{}`) + f(`{"create":{}} +foobar`) +} + +func TestReadBulkRequestSuccess(t *testing.T) { f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) { t.Helper() @@ -66,11 +92,15 @@ func TestReadBulkRequest(t *testing.T) { } } + // Verify an empty data + f("", "_time", "_msg", 0, nil, "") + + // Verify non-empty data data := `{"create":{"_index":"filebeat-8.8.0"}} {"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"} {"create":{"_index":"filebeat-8.8.0"}} {"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"} -{"create":{"_index":"filebeat-8.8.0"}} +{"index":{"_index":"filebeat-8.8.0"}} {"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"} ` timeField := "@timestamp"