mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/logstorage: allow writing after N
in front of before N
at stream_context
pipe
This commit is contained in:
parent
e11f0aa9ec
commit
3b6c78c26c
2 changed files with 43 additions and 34 deletions
|
@ -503,38 +503,9 @@ func parsePipeStreamContext(lex *lexer) (*pipeStreamContext, error) {
|
||||||
}
|
}
|
||||||
lex.nextToken()
|
lex.nextToken()
|
||||||
|
|
||||||
linesBefore := 0
|
linesBefore, linesAfter, err := parsePipeStreamContextBeforeAfter(lex)
|
||||||
beforeSet := false
|
if err != nil {
|
||||||
if lex.isKeyword("before") {
|
return nil, err
|
||||||
lex.nextToken()
|
|
||||||
f, s, err := parseNumber(lex)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot parse 'before' value in 'stream_context': %w", err)
|
|
||||||
}
|
|
||||||
if f < 0 {
|
|
||||||
return nil, fmt.Errorf("'before' value cannot be smaller than 0; got %q", s)
|
|
||||||
}
|
|
||||||
linesBefore = int(f)
|
|
||||||
beforeSet = true
|
|
||||||
}
|
|
||||||
|
|
||||||
linesAfter := 0
|
|
||||||
afterSet := false
|
|
||||||
if lex.isKeyword("after") {
|
|
||||||
lex.nextToken()
|
|
||||||
f, s, err := parseNumber(lex)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot parse 'after' value in 'stream_context': %w", err)
|
|
||||||
}
|
|
||||||
if f < 0 {
|
|
||||||
return nil, fmt.Errorf("'after' value cannot be smaller than 0; got %q", s)
|
|
||||||
}
|
|
||||||
linesAfter = int(f)
|
|
||||||
afterSet = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if !beforeSet && !afterSet {
|
|
||||||
return nil, fmt.Errorf("missing 'before N' or 'after N' in 'stream_context'")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pc := &pipeStreamContext{
|
pc := &pipeStreamContext{
|
||||||
|
@ -543,3 +514,41 @@ func parsePipeStreamContext(lex *lexer) (*pipeStreamContext, error) {
|
||||||
}
|
}
|
||||||
return pc, nil
|
return pc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func parsePipeStreamContextBeforeAfter(lex *lexer) (int, int, error) {
|
||||||
|
linesBefore := 0
|
||||||
|
linesAfter := 0
|
||||||
|
beforeSet := false
|
||||||
|
afterSet := false
|
||||||
|
for {
|
||||||
|
switch {
|
||||||
|
case lex.isKeyword("before"):
|
||||||
|
lex.nextToken()
|
||||||
|
f, s, err := parseNumber(lex)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("cannot parse 'before' value in 'stream_context': %w", err)
|
||||||
|
}
|
||||||
|
if f < 0 {
|
||||||
|
return 0, 0, fmt.Errorf("'before' value cannot be smaller than 0; got %q", s)
|
||||||
|
}
|
||||||
|
linesBefore = int(f)
|
||||||
|
beforeSet = true
|
||||||
|
case lex.isKeyword("after"):
|
||||||
|
lex.nextToken()
|
||||||
|
f, s, err := parseNumber(lex)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("cannot parse 'after' value in 'stream_context': %w", err)
|
||||||
|
}
|
||||||
|
if f < 0 {
|
||||||
|
return 0, 0, fmt.Errorf("'after' value cannot be smaller than 0; got %q", s)
|
||||||
|
}
|
||||||
|
linesAfter = int(f)
|
||||||
|
afterSet = true
|
||||||
|
default:
|
||||||
|
if !beforeSet && !afterSet {
|
||||||
|
return 0, 0, fmt.Errorf("missing 'before N' or 'after N' in 'stream_context'")
|
||||||
|
}
|
||||||
|
return linesBefore, linesAfter, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -104,10 +104,10 @@ func TestPipeStreamContextUpdateNeededFields(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// all the needed fields
|
// all the needed fields
|
||||||
f("stream_context before 10", "*", "", "*", "")
|
f("stream_context after 4 before 10", "*", "", "*", "")
|
||||||
|
|
||||||
// plus unneeded fields
|
// plus unneeded fields
|
||||||
f("stream_context after 4", "*", "f1,f2", "*", "f1,f2")
|
f("stream_context before 10 after 4", "*", "f1,f2", "*", "f1,f2")
|
||||||
f("stream_context after 4", "*", "_time,f1,_stream_id", "*", "f1")
|
f("stream_context after 4", "*", "_time,f1,_stream_id", "*", "f1")
|
||||||
|
|
||||||
// needed fields
|
// needed fields
|
||||||
|
|
Loading…
Reference in a new issue