diff --git a/lib/logstorage/pipe_stream_context.go b/lib/logstorage/pipe_stream_context.go index 51add0f3a..b35824a2c 100644 --- a/lib/logstorage/pipe_stream_context.go +++ b/lib/logstorage/pipe_stream_context.go @@ -503,38 +503,9 @@ func parsePipeStreamContext(lex *lexer) (*pipeStreamContext, error) { } lex.nextToken() - linesBefore := 0 - beforeSet := false - if lex.isKeyword("before") { - lex.nextToken() - f, s, err := parseNumber(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'before' value in 'stream_context': %w", err) - } - if f < 0 { - return nil, fmt.Errorf("'before' value cannot be smaller than 0; got %q", s) - } - linesBefore = int(f) - beforeSet = true - } - - linesAfter := 0 - afterSet := false - if lex.isKeyword("after") { - lex.nextToken() - f, s, err := parseNumber(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'after' value in 'stream_context': %w", err) - } - if f < 0 { - return nil, fmt.Errorf("'after' value cannot be smaller than 0; got %q", s) - } - linesAfter = int(f) - afterSet = true - } - - if !beforeSet && !afterSet { - return nil, fmt.Errorf("missing 'before N' or 'after N' in 'stream_context'") + linesBefore, linesAfter, err := parsePipeStreamContextBeforeAfter(lex) + if err != nil { + return nil, err } pc := &pipeStreamContext{ @@ -543,3 +514,41 @@ func parsePipeStreamContext(lex *lexer) (*pipeStreamContext, error) { } return pc, nil } + +func parsePipeStreamContextBeforeAfter(lex *lexer) (int, int, error) { + linesBefore := 0 + linesAfter := 0 + beforeSet := false + afterSet := false + for { + switch { + case lex.isKeyword("before"): + lex.nextToken() + f, s, err := parseNumber(lex) + if err != nil { + return 0, 0, fmt.Errorf("cannot parse 'before' value in 'stream_context': %w", err) + } + if f < 0 { + return 0, 0, fmt.Errorf("'before' value cannot be smaller than 0; got %q", s) + } + linesBefore = int(f) + beforeSet = true + case lex.isKeyword("after"): + lex.nextToken() + f, s, err := parseNumber(lex) + if err != nil { + return 0, 0, fmt.Errorf("cannot parse 'after' value in 'stream_context': %w", err) + } + if f < 0 { + return 0, 0, fmt.Errorf("'after' value cannot be smaller than 0; got %q", s) + } + linesAfter = int(f) + afterSet = true + default: + if !beforeSet && !afterSet { + return 0, 0, fmt.Errorf("missing 'before N' or 'after N' in 'stream_context'") + } + return linesBefore, linesAfter, nil + } + } +} diff --git a/lib/logstorage/pipe_stream_context_test.go b/lib/logstorage/pipe_stream_context_test.go index 12943f85f..cd63f4128 100644 --- a/lib/logstorage/pipe_stream_context_test.go +++ b/lib/logstorage/pipe_stream_context_test.go @@ -104,10 +104,10 @@ func TestPipeStreamContextUpdateNeededFields(t *testing.T) { } // all the needed fields - f("stream_context before 10", "*", "", "*", "") + f("stream_context after 4 before 10", "*", "", "*", "") // plus unneeded fields - f("stream_context after 4", "*", "f1,f2", "*", "f1,f2") + f("stream_context before 10 after 4", "*", "f1,f2", "*", "f1,f2") f("stream_context after 4", "*", "_time,f1,_stream_id", "*", "f1") // needed fields