From 657988ac3aabad2d2f6021040633a202587b7bf0 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 8 Sep 2024 14:13:38 +0200 Subject: [PATCH] app/vlselect: consistently reuse the original query timestamp when executing /select/logsql/query with positive limit=N query arg Previously the query could return incorrect results, since the query timestamp was updated with every Query.Clone() call during iterative search for the time range with up to limit=N rows. While at it, optimize queries, which find low number of matching logs, while spend a lot of CPU time for searching across big number of logs. The optimization reduces the upper bound of the time range to search if the current time range contains zero matching rows. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785 --- app/vlselect/logsql/logsql.go | 54 ++++++++++++++++++++++++++++------- lib/logstorage/parser.go | 4 ++- lib/logstorage/parser_test.go | 1 + 3 files changed, 47 insertions(+), 12 deletions(-) diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index e12e53ed0..6b25bb409 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -844,6 +844,7 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, limitUpper := 2 * limit q.AddPipeLimit(uint64(limitUpper)) q.Optimize() + rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limitUpper) if err != nil { return nil, err @@ -854,7 +855,7 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, return rows, nil } - // Slow path - search for the time range containing up to limitUpper rows. + // Slow path - adjust time range for selecting up to limitUpper rows start, end := q.GetFilterTimeRange() d := end/2 - start/2 start += d @@ -868,18 +869,44 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, return nil, err } - if len(rows) >= limit && len(rows) < limitUpper || d == 0 { + if d == 0 || start >= end { + // The [start ... end] time range equals one nanosecond. + // Just return up to limit rows. + if len(rows) > limit { + rows = rows[:limit] + } + return rows, nil + } + + dLastBit := d & 1 + d /= 2 + + if len(rows) >= limitUpper { + // The number of found rows on the [start ... end] time range exceeds limitUpper, + // so reduce the time range to [start+d ... end]. + start += d + continue + } + if len(rows) >= limit { + // The number of found rows is in the range [limit ... limitUpper). + // This means that found rows contains the needed limit rows with the biggest timestamps. rows = getLastNRows(rows, limit) return rows, nil } - lastBit := d & 1 - d /= 2 - if len(rows) > limit { - start += d - } else { - start -= d + lastBit + // The number of found rows on [start ... end] time range is below the limit. + // This means the time range doesn't cover the needed logs, so it must be extended. + + if len(rows) == 0 { + // The [start ... end] time range doesn't contain any rows, so change it to [start-d ... start). + end = start - 1 + start -= d + dLastBit + continue } + + // The number of found rows on [start ... end] time range is bigger than 0 but smaller than limit. + // Increase the time range to [start-d ... end]. + start -= d + dLastBit } } @@ -900,20 +927,25 @@ func getQueryResultsWithLimit(ctx context.Context, tenantIDs []logstorage.Tenant var rows []row var rowsLock sync.Mutex writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) { - rowsLock.Lock() - defer rowsLock.Unlock() + clonedColumnNames := make([]string, len(columns)) + for i, c := range columns { + clonedColumnNames[i] = strings.Clone(c.Name) + } for i, timestamp := range timestamps { fields := make([]logstorage.Field, len(columns)) for j := range columns { f := &fields[j] - f.Name = strings.Clone(columns[j].Name) + f.Name = clonedColumnNames[j] f.Value = strings.Clone(columns[j].Values[i]) } + + rowsLock.Lock() rows = append(rows, row{ timestamp: timestamp, fields: fields, }) + rowsLock.Unlock() } if len(rows) >= limit { diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 87ff9dc6b..58ce041b5 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -338,7 +338,8 @@ func (q *Query) AddCountByTimePipe(step, off int64, fields []string) { // Clone returns a copy of q. func (q *Query) Clone() *Query { qStr := q.String() - qCopy, err := ParseQuery(qStr) + timestamp := q.GetTimestamp() + qCopy, err := ParseQueryAtTimestamp(qStr, timestamp) if err != nil { logger.Panicf("BUG: cannot parse %q: %s", qStr, err) } @@ -353,6 +354,7 @@ func (q *Query) CanReturnLastNResults() bool { *pipeFieldValues, *pipeLimit, *pipeOffset, + *pipeTop, *pipeSort, *pipeStats, *pipeUniq: diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 453080b21..ca78dd1f5 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -2032,6 +2032,7 @@ func TestQueryCanReturnLastNResults(t *testing.T) { f("* | uniq (x)", false) f("* | field_names", false) f("* | field_values x", false) + f("* | top 5 by (x)", false) }