app/vlselect: consistently reuse the original query timestamp when executing /select/logsql/query with positive limit=N query arg

Previously the query could return incorrect results, since the query timestamp was updated with every Query.Clone() call
during iterative search for the time range with up to limit=N rows.

While at it, optimize queries, which find low number of matching logs, while spend a lot of CPU time for searching
across big number of logs. The optimization reduces the upper bound of the time range to search if the current time range
contains zero matching rows.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785
This commit is contained in:
Aliaksandr Valialkin 2024-09-08 14:13:38 +02:00
parent 45a3713bdb
commit 657988ac3a
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 47 additions and 12 deletions

View file

@ -844,6 +844,7 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID,
limitUpper := 2 * limit limitUpper := 2 * limit
q.AddPipeLimit(uint64(limitUpper)) q.AddPipeLimit(uint64(limitUpper))
q.Optimize() q.Optimize()
rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limitUpper) rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limitUpper)
if err != nil { if err != nil {
return nil, err return nil, err
@ -854,7 +855,7 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID,
return rows, nil return rows, nil
} }
// Slow path - search for the time range containing up to limitUpper rows. // Slow path - adjust time range for selecting up to limitUpper rows
start, end := q.GetFilterTimeRange() start, end := q.GetFilterTimeRange()
d := end/2 - start/2 d := end/2 - start/2
start += d start += d
@ -868,18 +869,44 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID,
return nil, err return nil, err
} }
if len(rows) >= limit && len(rows) < limitUpper || d == 0 { if d == 0 || start >= end {
// The [start ... end] time range equals one nanosecond.
// Just return up to limit rows.
if len(rows) > limit {
rows = rows[:limit]
}
return rows, nil
}
dLastBit := d & 1
d /= 2
if len(rows) >= limitUpper {
// The number of found rows on the [start ... end] time range exceeds limitUpper,
// so reduce the time range to [start+d ... end].
start += d
continue
}
if len(rows) >= limit {
// The number of found rows is in the range [limit ... limitUpper).
// This means that found rows contains the needed limit rows with the biggest timestamps.
rows = getLastNRows(rows, limit) rows = getLastNRows(rows, limit)
return rows, nil return rows, nil
} }
lastBit := d & 1 // The number of found rows on [start ... end] time range is below the limit.
d /= 2 // This means the time range doesn't cover the needed logs, so it must be extended.
if len(rows) > limit {
start += d if len(rows) == 0 {
} else { // The [start ... end] time range doesn't contain any rows, so change it to [start-d ... start).
start -= d + lastBit end = start - 1
start -= d + dLastBit
continue
} }
// The number of found rows on [start ... end] time range is bigger than 0 but smaller than limit.
// Increase the time range to [start-d ... end].
start -= d + dLastBit
} }
} }
@ -900,20 +927,25 @@ func getQueryResultsWithLimit(ctx context.Context, tenantIDs []logstorage.Tenant
var rows []row var rows []row
var rowsLock sync.Mutex var rowsLock sync.Mutex
writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) { writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
rowsLock.Lock() clonedColumnNames := make([]string, len(columns))
defer rowsLock.Unlock() for i, c := range columns {
clonedColumnNames[i] = strings.Clone(c.Name)
}
for i, timestamp := range timestamps { for i, timestamp := range timestamps {
fields := make([]logstorage.Field, len(columns)) fields := make([]logstorage.Field, len(columns))
for j := range columns { for j := range columns {
f := &fields[j] f := &fields[j]
f.Name = strings.Clone(columns[j].Name) f.Name = clonedColumnNames[j]
f.Value = strings.Clone(columns[j].Values[i]) f.Value = strings.Clone(columns[j].Values[i])
} }
rowsLock.Lock()
rows = append(rows, row{ rows = append(rows, row{
timestamp: timestamp, timestamp: timestamp,
fields: fields, fields: fields,
}) })
rowsLock.Unlock()
} }
if len(rows) >= limit { if len(rows) >= limit {

View file

@ -338,7 +338,8 @@ func (q *Query) AddCountByTimePipe(step, off int64, fields []string) {
// Clone returns a copy of q. // Clone returns a copy of q.
func (q *Query) Clone() *Query { func (q *Query) Clone() *Query {
qStr := q.String() qStr := q.String()
qCopy, err := ParseQuery(qStr) timestamp := q.GetTimestamp()
qCopy, err := ParseQueryAtTimestamp(qStr, timestamp)
if err != nil { if err != nil {
logger.Panicf("BUG: cannot parse %q: %s", qStr, err) logger.Panicf("BUG: cannot parse %q: %s", qStr, err)
} }
@ -353,6 +354,7 @@ func (q *Query) CanReturnLastNResults() bool {
*pipeFieldValues, *pipeFieldValues,
*pipeLimit, *pipeLimit,
*pipeOffset, *pipeOffset,
*pipeTop,
*pipeSort, *pipeSort,
*pipeStats, *pipeStats,
*pipeUniq: *pipeUniq:

View file

@ -2032,6 +2032,7 @@ func TestQueryCanReturnLastNResults(t *testing.T) {
f("* | uniq (x)", false) f("* | uniq (x)", false)
f("* | field_names", false) f("* | field_names", false)
f("* | field_values x", false) f("* | field_values x", false)
f("* | top 5 by (x)", false)
} }