diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index e12e53ed0..6b25bb409 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -844,6 +844,7 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, limitUpper := 2 * limit q.AddPipeLimit(uint64(limitUpper)) q.Optimize() + rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limitUpper) if err != nil { return nil, err @@ -854,7 +855,7 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, return rows, nil } - // Slow path - search for the time range containing up to limitUpper rows. + // Slow path - adjust time range for selecting up to limitUpper rows start, end := q.GetFilterTimeRange() d := end/2 - start/2 start += d @@ -868,18 +869,44 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, return nil, err } - if len(rows) >= limit && len(rows) < limitUpper || d == 0 { + if d == 0 || start >= end { + // The [start ... end] time range equals one nanosecond. + // Just return up to limit rows. + if len(rows) > limit { + rows = rows[:limit] + } + return rows, nil + } + + dLastBit := d & 1 + d /= 2 + + if len(rows) >= limitUpper { + // The number of found rows on the [start ... end] time range exceeds limitUpper, + // so reduce the time range to [start+d ... end]. + start += d + continue + } + if len(rows) >= limit { + // The number of found rows is in the range [limit ... limitUpper). + // This means that found rows contains the needed limit rows with the biggest timestamps. rows = getLastNRows(rows, limit) return rows, nil } - lastBit := d & 1 - d /= 2 - if len(rows) > limit { - start += d - } else { - start -= d + lastBit + // The number of found rows on [start ... end] time range is below the limit. + // This means the time range doesn't cover the needed logs, so it must be extended. + + if len(rows) == 0 { + // The [start ... end] time range doesn't contain any rows, so change it to [start-d ... start). + end = start - 1 + start -= d + dLastBit + continue } + + // The number of found rows on [start ... end] time range is bigger than 0 but smaller than limit. + // Increase the time range to [start-d ... end]. + start -= d + dLastBit } } @@ -900,20 +927,25 @@ func getQueryResultsWithLimit(ctx context.Context, tenantIDs []logstorage.Tenant var rows []row var rowsLock sync.Mutex writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) { - rowsLock.Lock() - defer rowsLock.Unlock() + clonedColumnNames := make([]string, len(columns)) + for i, c := range columns { + clonedColumnNames[i] = strings.Clone(c.Name) + } for i, timestamp := range timestamps { fields := make([]logstorage.Field, len(columns)) for j := range columns { f := &fields[j] - f.Name = strings.Clone(columns[j].Name) + f.Name = clonedColumnNames[j] f.Value = strings.Clone(columns[j].Values[i]) } + + rowsLock.Lock() rows = append(rows, row{ timestamp: timestamp, fields: fields, }) + rowsLock.Unlock() } if len(rows) >= limit { diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 87ff9dc6b..58ce041b5 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -338,7 +338,8 @@ func (q *Query) AddCountByTimePipe(step, off int64, fields []string) { // Clone returns a copy of q. func (q *Query) Clone() *Query { qStr := q.String() - qCopy, err := ParseQuery(qStr) + timestamp := q.GetTimestamp() + qCopy, err := ParseQueryAtTimestamp(qStr, timestamp) if err != nil { logger.Panicf("BUG: cannot parse %q: %s", qStr, err) } @@ -353,6 +354,7 @@ func (q *Query) CanReturnLastNResults() bool { *pipeFieldValues, *pipeLimit, *pipeOffset, + *pipeTop, *pipeSort, *pipeStats, *pipeUniq: diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 453080b21..ca78dd1f5 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -2032,6 +2032,7 @@ func TestQueryCanReturnLastNResults(t *testing.T) { f("* | uniq (x)", false) f("* | field_names", false) f("* | field_values x", false) + f("* | top 5 by (x)", false) }