app/vlinsert/elasticsearch: allow empty lines in Elasticsearch bulk protocol

Empty lines may appear there during debugging and custom client implementation
This commit is contained in:
Aliaksandr Valialkin 2023-06-20 21:23:36 -07:00
parent 07bd118b1a
commit 3edc548584
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 14 additions and 7 deletions

View file

@ -208,17 +208,22 @@ var rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="elastic
func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
processLogMessage func(timestamp int64, fields []logstorage.Field),
) (bool, error) {
var line []byte
// Read the command, must be "create" or "index"
if !sc.Scan() {
if err := sc.Err(); err != nil {
if errors.Is(err, bufio.ErrTooLong) {
return false, fmt.Errorf(`cannot read "create" or "index" command, since its size exceeds -insert.maxLineSizeBytes=%d`, maxLineSizeBytes.IntN())
for len(line) == 0 {
if !sc.Scan() {
if err := sc.Err(); err != nil {
if errors.Is(err, bufio.ErrTooLong) {
return false, fmt.Errorf(`cannot read "create" or "index" command, since its size exceeds -insert.maxLineSizeBytes=%d`,
maxLineSizeBytes.IntN())
}
return false, err
}
return false, err
return false, nil
}
return false, nil
line = sc.Bytes()
}
line := sc.Bytes()
lineStr := bytesutil.ToUnsafeString(line)
if !strings.Contains(lineStr, `"create"`) && !strings.Contains(lineStr, `"index"`) {
return false, fmt.Errorf(`unexpected command %q; expecting "create" or "index"`, line)

View file

@ -94,6 +94,8 @@ func TestReadBulkRequestSuccess(t *testing.T) {
// Verify an empty data
f("", "_time", "_msg", 0, nil, "")
f("\n", "_time", "_msg", 0, nil, "")
f("\n\n", "_time", "_msg", 0, nil, "")
// Verify non-empty data
data := `{"create":{"_index":"filebeat-8.8.0"}}