This commit is contained in:
Aliaksandr Valialkin 2024-06-04 15:21:08 +02:00
parent 0a81bf79ba
commit 74fe2f4cdc
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -355,19 +355,20 @@ type row struct {
}
func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit int) ([]row, error) {
q.AddPipeLimit(uint64(limit + 1))
limitUpper := 2*limit
q.AddPipeLimit(uint64(limitUpper))
q.Optimize()
rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limit+1)
rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limitUpper)
if err != nil {
return nil, err
}
if len(rows) <= limit {
// Fast path - the requested time range contains up to limit rows.
sortRowsByTime(rows)
if len(rows) < limitUpper {
// Fast path - the requested time range contains up to limitUpper rows.
rows = getLastNRows(rows, limit)
return rows, nil
}
// Slow path - search for the time range with the requested limit rows.
// Slow path - search for the time range containing up to limitUpper rows.
start, end := q.GetFilterTimeRange()
d := end/2 - start/2
start += d
@ -376,16 +377,13 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID,
for {
q = qOrig.Clone()
q.AddTimeFilter(start, end)
rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limit+1)
rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limitUpper)
if err != nil {
return nil, err
}
if len(rows) == limit || len(rows) > limit && d < 10e6 || d == 0 {
sortRowsByTime(rows)
if len(rows) > limit {
rows = rows[len(rows)-limit:]
}
if len(rows) >= limit && len(rows) < limitUpper || d == 0 {
rows = getLastNRows(rows, limit)
return rows, nil
}
@ -399,10 +397,14 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID,
}
}
func sortRowsByTime(rows []row) {
func getLastNRows(rows []row, limit int) []row {
sort.Slice(rows, func(i, j int) bool {
return rows[i].timestamp < rows[j].timestamp
})
if len(rows) > limit {
rows = rows[len(rows)-limit:]
}
return rows
}
func getQueryResultsWithLimit(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit int) ([]row, error) {