mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vlselect: consistently reuse the original query timestamp when executing /select/logsql/query with positive limit=N query arg
Previously the query could return incorrect results, since the query timestamp was updated with every Query.Clone() call during iterative search for the time range with up to limit=N rows. While at it, optimize queries, which find low number of matching logs, while spend a lot of CPU time for searching across big number of logs. The optimization reduces the upper bound of the time range to search if the current time range contains zero matching rows. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6785
This commit is contained in:
parent
297301e8c0
commit
cad236003b
3 changed files with 47 additions and 12 deletions
|
@ -844,6 +844,7 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID,
|
||||||
limitUpper := 2 * limit
|
limitUpper := 2 * limit
|
||||||
q.AddPipeLimit(uint64(limitUpper))
|
q.AddPipeLimit(uint64(limitUpper))
|
||||||
q.Optimize()
|
q.Optimize()
|
||||||
|
|
||||||
rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limitUpper)
|
rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limitUpper)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -854,7 +855,7 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID,
|
||||||
return rows, nil
|
return rows, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Slow path - search for the time range containing up to limitUpper rows.
|
// Slow path - adjust time range for selecting up to limitUpper rows
|
||||||
start, end := q.GetFilterTimeRange()
|
start, end := q.GetFilterTimeRange()
|
||||||
d := end/2 - start/2
|
d := end/2 - start/2
|
||||||
start += d
|
start += d
|
||||||
|
@ -868,18 +869,44 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID,
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(rows) >= limit && len(rows) < limitUpper || d == 0 {
|
if d == 0 || start >= end {
|
||||||
|
// The [start ... end] time range equals one nanosecond.
|
||||||
|
// Just return up to limit rows.
|
||||||
|
if len(rows) > limit {
|
||||||
|
rows = rows[:limit]
|
||||||
|
}
|
||||||
|
return rows, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
dLastBit := d & 1
|
||||||
|
d /= 2
|
||||||
|
|
||||||
|
if len(rows) >= limitUpper {
|
||||||
|
// The number of found rows on the [start ... end] time range exceeds limitUpper,
|
||||||
|
// so reduce the time range to [start+d ... end].
|
||||||
|
start += d
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(rows) >= limit {
|
||||||
|
// The number of found rows is in the range [limit ... limitUpper).
|
||||||
|
// This means that found rows contains the needed limit rows with the biggest timestamps.
|
||||||
rows = getLastNRows(rows, limit)
|
rows = getLastNRows(rows, limit)
|
||||||
return rows, nil
|
return rows, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
lastBit := d & 1
|
// The number of found rows on [start ... end] time range is below the limit.
|
||||||
d /= 2
|
// This means the time range doesn't cover the needed logs, so it must be extended.
|
||||||
if len(rows) > limit {
|
|
||||||
start += d
|
if len(rows) == 0 {
|
||||||
} else {
|
// The [start ... end] time range doesn't contain any rows, so change it to [start-d ... start).
|
||||||
start -= d + lastBit
|
end = start - 1
|
||||||
|
start -= d + dLastBit
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The number of found rows on [start ... end] time range is bigger than 0 but smaller than limit.
|
||||||
|
// Increase the time range to [start-d ... end].
|
||||||
|
start -= d + dLastBit
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -900,20 +927,25 @@ func getQueryResultsWithLimit(ctx context.Context, tenantIDs []logstorage.Tenant
|
||||||
var rows []row
|
var rows []row
|
||||||
var rowsLock sync.Mutex
|
var rowsLock sync.Mutex
|
||||||
writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
|
writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
|
||||||
rowsLock.Lock()
|
clonedColumnNames := make([]string, len(columns))
|
||||||
defer rowsLock.Unlock()
|
for i, c := range columns {
|
||||||
|
clonedColumnNames[i] = strings.Clone(c.Name)
|
||||||
|
}
|
||||||
|
|
||||||
for i, timestamp := range timestamps {
|
for i, timestamp := range timestamps {
|
||||||
fields := make([]logstorage.Field, len(columns))
|
fields := make([]logstorage.Field, len(columns))
|
||||||
for j := range columns {
|
for j := range columns {
|
||||||
f := &fields[j]
|
f := &fields[j]
|
||||||
f.Name = strings.Clone(columns[j].Name)
|
f.Name = clonedColumnNames[j]
|
||||||
f.Value = strings.Clone(columns[j].Values[i])
|
f.Value = strings.Clone(columns[j].Values[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rowsLock.Lock()
|
||||||
rows = append(rows, row{
|
rows = append(rows, row{
|
||||||
timestamp: timestamp,
|
timestamp: timestamp,
|
||||||
fields: fields,
|
fields: fields,
|
||||||
})
|
})
|
||||||
|
rowsLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(rows) >= limit {
|
if len(rows) >= limit {
|
||||||
|
|
|
@ -338,7 +338,8 @@ func (q *Query) AddCountByTimePipe(step, off int64, fields []string) {
|
||||||
// Clone returns a copy of q.
|
// Clone returns a copy of q.
|
||||||
func (q *Query) Clone() *Query {
|
func (q *Query) Clone() *Query {
|
||||||
qStr := q.String()
|
qStr := q.String()
|
||||||
qCopy, err := ParseQuery(qStr)
|
timestamp := q.GetTimestamp()
|
||||||
|
qCopy, err := ParseQueryAtTimestamp(qStr, timestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Panicf("BUG: cannot parse %q: %s", qStr, err)
|
logger.Panicf("BUG: cannot parse %q: %s", qStr, err)
|
||||||
}
|
}
|
||||||
|
@ -353,6 +354,7 @@ func (q *Query) CanReturnLastNResults() bool {
|
||||||
*pipeFieldValues,
|
*pipeFieldValues,
|
||||||
*pipeLimit,
|
*pipeLimit,
|
||||||
*pipeOffset,
|
*pipeOffset,
|
||||||
|
*pipeTop,
|
||||||
*pipeSort,
|
*pipeSort,
|
||||||
*pipeStats,
|
*pipeStats,
|
||||||
*pipeUniq:
|
*pipeUniq:
|
||||||
|
|
|
@ -2032,6 +2032,7 @@ func TestQueryCanReturnLastNResults(t *testing.T) {
|
||||||
f("* | uniq (x)", false)
|
f("* | uniq (x)", false)
|
||||||
f("* | field_names", false)
|
f("* | field_names", false)
|
||||||
f("* | field_values x", false)
|
f("* | field_values x", false)
|
||||||
|
f("* | top 5 by (x)", false)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue