diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index a387fc008..1a1526420 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -208,17 +208,22 @@ var rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="elastic func readBulkLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field), ) (bool, error) { + var line []byte + // Read the command, must be "create" or "index" - if !sc.Scan() { - if err := sc.Err(); err != nil { - if errors.Is(err, bufio.ErrTooLong) { - return false, fmt.Errorf(`cannot read "create" or "index" command, since its size exceeds -insert.maxLineSizeBytes=%d`, maxLineSizeBytes.IntN()) + for len(line) == 0 { + if !sc.Scan() { + if err := sc.Err(); err != nil { + if errors.Is(err, bufio.ErrTooLong) { + return false, fmt.Errorf(`cannot read "create" or "index" command, since its size exceeds -insert.maxLineSizeBytes=%d`, + maxLineSizeBytes.IntN()) + } + return false, err } - return false, err + return false, nil } - return false, nil + line = sc.Bytes() } - line := sc.Bytes() lineStr := bytesutil.ToUnsafeString(line) if !strings.Contains(lineStr, `"create"`) && !strings.Contains(lineStr, `"index"`) { return false, fmt.Errorf(`unexpected command %q; expecting "create" or "index"`, line) diff --git a/app/vlinsert/elasticsearch/elasticsearch_test.go b/app/vlinsert/elasticsearch/elasticsearch_test.go index 859e882f2..09d1bf770 100644 --- a/app/vlinsert/elasticsearch/elasticsearch_test.go +++ b/app/vlinsert/elasticsearch/elasticsearch_test.go @@ -94,6 +94,8 @@ func TestReadBulkRequestSuccess(t *testing.T) { // Verify an empty data f("", "_time", "_msg", 0, nil, "") + f("\n", "_time", "_msg", 0, nil, "") + f("\n\n", "_time", "_msg", 0, nil, "") // Verify non-empty data data := `{"create":{"_index":"filebeat-8.8.0"}}