mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vlinsert/elasticsearch: optimize parsing command line
Just search for "create" or "index" substrings there instead of spending CPU time on its parsing
This commit is contained in:
parent
4e56b4eb36
commit
07bd118b1a
2 changed files with 38 additions and 15 deletions
|
@ -208,7 +208,7 @@ var rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="elastic
|
||||||
func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
|
func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
|
||||||
processLogMessage func(timestamp int64, fields []logstorage.Field),
|
processLogMessage func(timestamp int64, fields []logstorage.Field),
|
||||||
) (bool, error) {
|
) (bool, error) {
|
||||||
// Decode command, must be "create" or "index"
|
// Read the command, must be "create" or "index"
|
||||||
if !sc.Scan() {
|
if !sc.Scan() {
|
||||||
if err := sc.Err(); err != nil {
|
if err := sc.Err(); err != nil {
|
||||||
if errors.Is(err, bufio.ErrTooLong) {
|
if errors.Is(err, bufio.ErrTooLong) {
|
||||||
|
@ -219,15 +219,10 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
line := sc.Bytes()
|
line := sc.Bytes()
|
||||||
p := parserPool.Get()
|
lineStr := bytesutil.ToUnsafeString(line)
|
||||||
v, err := p.ParseBytes(line)
|
if !strings.Contains(lineStr, `"create"`) && !strings.Contains(lineStr, `"index"`) {
|
||||||
if err != nil {
|
return false, fmt.Errorf(`unexpected command %q; expecting "create" or "index"`, line)
|
||||||
return false, fmt.Errorf(`cannot parse "create" or "index" command: %w`, err)
|
|
||||||
}
|
}
|
||||||
if v.GetObject("create") == nil && v.GetObject("index") == nil {
|
|
||||||
return false, fmt.Errorf(`unexpected command %q; expected "create" or "index"`, v)
|
|
||||||
}
|
|
||||||
parserPool.Put(p)
|
|
||||||
|
|
||||||
// Decode log message
|
// Decode log message
|
||||||
if !sc.Scan() {
|
if !sc.Scan() {
|
||||||
|
@ -242,14 +237,12 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
|
||||||
line = sc.Bytes()
|
line = sc.Bytes()
|
||||||
pctx := getParserCtx()
|
pctx := getParserCtx()
|
||||||
if err := pctx.parseLogMessage(line); err != nil {
|
if err := pctx.parseLogMessage(line); err != nil {
|
||||||
invalidJSONLineLogger.Warnf("cannot parse json-encoded log entry: %s", err)
|
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
|
||||||
return true, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
timestamp, err := extractTimestampFromFields(timeField, pctx.fields)
|
timestamp, err := extractTimestampFromFields(timeField, pctx.fields)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
invalidTimestampLogger.Warnf("skipping the log entry because cannot parse timestamp: %s", err)
|
return false, fmt.Errorf("cannot parse timestamp: %w", err)
|
||||||
return true, nil
|
|
||||||
}
|
}
|
||||||
updateMessageFieldName(msgField, pctx.fields)
|
updateMessageFieldName(msgField, pctx.fields)
|
||||||
processLogMessage(timestamp, pctx.fields)
|
processLogMessage(timestamp, pctx.fields)
|
||||||
|
|
|
@ -11,7 +11,33 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestReadBulkRequest(t *testing.T) {
|
func TestReadBulkRequestFailure(t *testing.T) {
|
||||||
|
f := func(data string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
|
||||||
|
t.Fatalf("unexpected call to processLogMessage with timestamp=%d, fields=%s", timestamp, fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
r := bytes.NewBufferString(data)
|
||||||
|
rows, err := readBulkRequest(r, false, "_time", "_msg", processLogMessage)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expecting non-empty error")
|
||||||
|
}
|
||||||
|
if rows != 0 {
|
||||||
|
t.Fatalf("unexpected non-zero rows=%d", rows)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f("foobar")
|
||||||
|
f(`{}`)
|
||||||
|
f(`{"create":{}}`)
|
||||||
|
f(`{"creat":{}}
|
||||||
|
{}`)
|
||||||
|
f(`{"create":{}}
|
||||||
|
foobar`)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadBulkRequestSuccess(t *testing.T) {
|
||||||
f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) {
|
f := func(data, timeField, msgField string, rowsExpected int, timestampsExpected []int64, resultExpected string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
|
@ -66,11 +92,15 @@ func TestReadBulkRequest(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify an empty data
|
||||||
|
f("", "_time", "_msg", 0, nil, "")
|
||||||
|
|
||||||
|
// Verify non-empty data
|
||||||
data := `{"create":{"_index":"filebeat-8.8.0"}}
|
data := `{"create":{"_index":"filebeat-8.8.0"}}
|
||||||
{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
|
{"@timestamp":"2023-06-06T04:48:11.735Z","log":{"offset":71770,"file":{"path":"/var/log/auth.log"}},"message":"foobar"}
|
||||||
{"create":{"_index":"filebeat-8.8.0"}}
|
{"create":{"_index":"filebeat-8.8.0"}}
|
||||||
{"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"}
|
{"@timestamp":"2023-06-06T04:48:12.735Z","message":"baz"}
|
||||||
{"create":{"_index":"filebeat-8.8.0"}}
|
{"index":{"_index":"filebeat-8.8.0"}}
|
||||||
{"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"}
|
{"message":"xyz","@timestamp":"2023-06-06T04:48:13.735Z","x":"y"}
|
||||||
`
|
`
|
||||||
timeField := "@timestamp"
|
timeField := "@timestamp"
|
||||||
|
|
Loading…
Reference in a new issue