2023-06-20 05:55:12 +00:00
package logsql
import (
2024-05-12 14:33:29 +00:00
"context"
2024-05-15 02:55:44 +00:00
"fmt"
"math"
2023-06-20 05:55:12 +00:00
"net/http"
2024-05-20 02:08:30 +00:00
"sort"
"strings"
"sync"
2024-05-15 02:55:44 +00:00
"time"
2023-06-20 05:55:12 +00:00
2024-06-27 12:18:42 +00:00
"github.com/VictoriaMetrics/metrics"
2023-06-20 05:55:12 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2024-02-18 20:58:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
2024-06-27 13:05:57 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2023-06-20 05:55:12 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
2024-05-15 02:55:44 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
2023-06-20 05:55:12 +00:00
)
2024-05-20 02:08:30 +00:00
// ProcessHitsRequest handles /select/logsql/hits request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-hits-stats
func ProcessHitsRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
q , tenantIDs , err := parseCommonArgs ( r )
2023-06-20 05:55:12 +00:00
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
2024-05-20 02:08:30 +00:00
// Obtain step
stepStr := r . FormValue ( "step" )
if stepStr == "" {
stepStr = "1d"
}
step , err := promutils . ParseDuration ( stepStr )
2023-06-20 05:55:12 +00:00
if err != nil {
2024-05-20 02:08:30 +00:00
httpserver . Errorf ( w , r , "cannot parse 'step' arg: %s" , err )
2023-06-20 05:55:12 +00:00
return
}
2024-05-20 02:08:30 +00:00
if step <= 0 {
httpserver . Errorf ( w , r , "'step' must be bigger than zero" )
}
2024-02-18 21:01:34 +00:00
2024-05-20 02:08:30 +00:00
// Obtain offset
offsetStr := r . FormValue ( "offset" )
if offsetStr == "" {
offsetStr = "0s"
}
offset , err := promutils . ParseDuration ( offsetStr )
if err != nil {
httpserver . Errorf ( w , r , "cannot parse 'offset' arg: %s" , err )
return
}
// Obtain field entries
fields := r . Form [ "field" ]
// Prepare the query
q . AddCountByTimePipe ( int64 ( step ) , int64 ( offset ) , fields )
q . Optimize ( )
var mLock sync . Mutex
m := make ( map [ string ] * hitsSeries )
writeBlock := func ( _ uint , timestamps [ ] int64 , columns [ ] logstorage . BlockColumn ) {
if len ( columns ) == 0 || len ( columns [ 0 ] . Values ) == 0 {
return
}
timestampValues := columns [ 0 ] . Values
hitsValues := columns [ len ( columns ) - 1 ] . Values
columns = columns [ 1 : len ( columns ) - 1 ]
bb := blockResultPool . Get ( )
for i := range timestamps {
timestampStr := strings . Clone ( timestampValues [ i ] )
hitsStr := strings . Clone ( hitsValues [ i ] )
bb . Reset ( )
2024-05-25 19:36:16 +00:00
WriteFieldsForHits ( bb , columns , i )
2024-05-20 02:08:30 +00:00
mLock . Lock ( )
hs , ok := m [ string ( bb . B ) ]
if ! ok {
k := string ( bb . B )
hs = & hitsSeries { }
m [ k ] = hs
}
hs . timestamps = append ( hs . timestamps , timestampStr )
hs . values = append ( hs . values , hitsStr )
mLock . Unlock ( )
}
blockResultPool . Put ( bb )
}
// Execute the query
if err := vlstorage . RunQuery ( ctx , tenantIDs , q , writeBlock ) ; err != nil {
httpserver . Errorf ( w , r , "cannot execute query [%s]: %s" , q , err )
return
}
// Write response
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteHitsSeries ( w , m )
}
type hitsSeries struct {
timestamps [ ] string
values [ ] string
}
func ( hs * hitsSeries ) sort ( ) {
sort . Sort ( hs )
}
func ( hs * hitsSeries ) Len ( ) int {
return len ( hs . timestamps )
}
func ( hs * hitsSeries ) Swap ( i , j int ) {
hs . timestamps [ i ] , hs . timestamps [ j ] = hs . timestamps [ j ] , hs . timestamps [ i ]
hs . values [ i ] , hs . values [ j ] = hs . values [ j ] , hs . values [ i ]
}
func ( hs * hitsSeries ) Less ( i , j int ) bool {
return hs . timestamps [ i ] < hs . timestamps [ j ]
}
// ProcessFieldNamesRequest handles /select/logsql/field_names request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-field-names
func ProcessFieldNamesRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
q , tenantIDs , err := parseCommonArgs ( r )
2024-05-15 02:55:44 +00:00
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
2024-05-20 02:08:30 +00:00
// Obtain field names for the given query
q . Optimize ( )
fieldNames , err := vlstorage . GetFieldNames ( ctx , tenantIDs , q )
if err != nil {
httpserver . Errorf ( w , r , "cannot obtain field names: %s" , err )
return
}
// Write results
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2024-05-24 01:06:55 +00:00
WriteValuesWithHitsJSON ( w , fieldNames )
2024-05-20 02:08:30 +00:00
}
// ProcessFieldValuesRequest handles /select/logsql/field_values request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values
func ProcessFieldValuesRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
q , tenantIDs , err := parseCommonArgs ( r )
2024-05-15 02:55:44 +00:00
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
2024-05-20 02:08:30 +00:00
// Parse fieldName query arg
2024-05-22 19:01:20 +00:00
fieldName := r . FormValue ( "field" )
2024-05-20 02:08:30 +00:00
if fieldName == "" {
2024-05-22 19:01:20 +00:00
httpserver . Errorf ( w , r , "missing 'field' query arg" )
2024-05-20 02:08:30 +00:00
return
2024-05-15 02:55:44 +00:00
}
// Parse limit query arg
limit , err := httputils . GetInt ( r , "limit" )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
2024-05-20 02:08:30 +00:00
if limit < 0 {
limit = 0
2024-05-14 01:05:03 +00:00
}
2024-05-20 02:08:30 +00:00
// Obtain unique values for the given field
2024-05-15 02:55:44 +00:00
q . Optimize ( )
2024-05-20 02:08:30 +00:00
values , err := vlstorage . GetFieldValues ( ctx , tenantIDs , q , fieldName , uint64 ( limit ) )
if err != nil {
httpserver . Errorf ( w , r , "cannot obtain values for field %q: %s" , fieldName , err )
return
}
2024-05-14 01:05:03 +00:00
2024-05-22 19:01:20 +00:00
// Write results
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2024-05-24 01:06:55 +00:00
WriteValuesWithHitsJSON ( w , values )
2024-05-22 19:01:20 +00:00
}
2024-05-25 19:36:16 +00:00
// ProcessStreamFieldNamesRequest processes /select/logsql/stream_field_names request.
2024-05-22 19:01:20 +00:00
//
2024-05-25 19:36:16 +00:00
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-field-names
func ProcessStreamFieldNamesRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
2024-05-22 19:01:20 +00:00
q , tenantIDs , err := parseCommonArgs ( r )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
2024-05-25 19:36:16 +00:00
// Obtain stream field names for the given query
2024-05-22 19:01:20 +00:00
q . Optimize ( )
2024-05-25 19:36:16 +00:00
names , err := vlstorage . GetStreamFieldNames ( ctx , tenantIDs , q )
2024-05-22 19:01:20 +00:00
if err != nil {
2024-05-25 19:36:16 +00:00
httpserver . Errorf ( w , r , "cannot obtain stream field names: %s" , err )
2024-05-20 02:08:30 +00:00
}
// Write results
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2024-05-24 01:06:55 +00:00
WriteValuesWithHitsJSON ( w , names )
2024-05-22 19:01:20 +00:00
}
2024-05-25 19:36:16 +00:00
// ProcessStreamFieldValuesRequest processes /select/logsql/stream_field_values request.
2024-05-22 19:01:20 +00:00
//
2024-05-25 19:36:16 +00:00
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-field-values
func ProcessStreamFieldValuesRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
2024-05-22 19:01:20 +00:00
q , tenantIDs , err := parseCommonArgs ( r )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
2024-05-25 19:36:16 +00:00
// Parse fieldName query arg
fieldName := r . FormValue ( "field" )
if fieldName == "" {
httpserver . Errorf ( w , r , "missing 'field' query arg" )
2024-05-22 19:01:20 +00:00
return
}
// Parse limit query arg
limit , err := httputils . GetInt ( r , "limit" )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
if limit < 0 {
limit = 0
}
2024-05-25 19:36:16 +00:00
// Obtain stream field values for the given query and the given fieldName
2024-05-22 19:01:20 +00:00
q . Optimize ( )
2024-05-25 19:36:16 +00:00
values , err := vlstorage . GetStreamFieldValues ( ctx , tenantIDs , q , fieldName , uint64 ( limit ) )
2024-05-22 19:01:20 +00:00
if err != nil {
2024-05-25 19:36:16 +00:00
httpserver . Errorf ( w , r , "cannot obtain stream field values: %s" , err )
2024-05-22 19:01:20 +00:00
}
// Write results
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2024-05-24 01:06:55 +00:00
WriteValuesWithHitsJSON ( w , values )
2024-05-22 19:01:20 +00:00
}
2024-06-27 12:18:42 +00:00
// ProcessStreamIDsRequest processes /select/logsql/stream_ids request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-stream_ids
func ProcessStreamIDsRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
q , tenantIDs , err := parseCommonArgs ( r )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
// Parse limit query arg
limit , err := httputils . GetInt ( r , "limit" )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
if limit < 0 {
limit = 0
}
// Obtain streamIDs for the given query
q . Optimize ( )
streamIDs , err := vlstorage . GetStreamIDs ( ctx , tenantIDs , q , uint64 ( limit ) )
if err != nil {
httpserver . Errorf ( w , r , "cannot obtain stream_ids: %s" , err )
}
// Write results
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteValuesWithHitsJSON ( w , streamIDs )
}
2024-05-22 19:01:20 +00:00
// ProcessStreamsRequest processes /select/logsql/streams request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-streams
func ProcessStreamsRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
q , tenantIDs , err := parseCommonArgs ( r )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
// Parse limit query arg
limit , err := httputils . GetInt ( r , "limit" )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
if limit < 0 {
limit = 0
}
// Obtain streams for the given query
q . Optimize ( )
streams , err := vlstorage . GetStreams ( ctx , tenantIDs , q , uint64 ( limit ) )
if err != nil {
httpserver . Errorf ( w , r , "cannot obtain streams: %s" , err )
}
// Write results
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2024-05-24 01:06:55 +00:00
WriteValuesWithHitsJSON ( w , streams )
2024-05-20 02:08:30 +00:00
}
2024-06-27 12:18:42 +00:00
// ProcessLiveTailRequest processes live tailing request to /select/logsq/tail
func ProcessLiveTailRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
liveTailRequests . Inc ( )
defer liveTailRequests . Dec ( )
q , tenantIDs , err := parseCommonArgs ( r )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
if ! q . CanLiveTail ( ) {
httpserver . Errorf ( w , r , "the query [%s] cannot be used in live tailing; see https://docs.victoriametrics.com/victorialogs/querying/#live-tailing for details" , q )
}
q . Optimize ( )
refreshIntervalMsecs , err := httputils . GetDuration ( r , "refresh_interval" , 1000 )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
refreshInterval := time . Millisecond * time . Duration ( refreshIntervalMsecs )
ctxWithCancel , cancel := context . WithCancel ( ctx )
tp := newTailProcessor ( cancel )
ticker := time . NewTicker ( refreshInterval )
defer ticker . Stop ( )
end := time . Now ( ) . UnixNano ( )
doneCh := ctxWithCancel . Done ( )
2024-06-27 13:05:57 +00:00
flusher , ok := w . ( http . Flusher )
if ! ok {
logger . Panicf ( "BUG: it is expected that http.ResponseWriter (%T) supports http.Flusher interface" , w )
}
2024-06-27 12:18:42 +00:00
for {
start := end - tailOffsetNsecs
end = time . Now ( ) . UnixNano ( )
qCopy := q . Clone ( )
qCopy . AddTimeFilter ( start , end )
if err := vlstorage . RunQuery ( ctxWithCancel , tenantIDs , qCopy , tp . writeBlock ) ; err != nil {
httpserver . Errorf ( w , r , "cannot execute tail query [%s]: %s" , q , err )
return
}
resultRows , err := tp . getTailRows ( )
if err != nil {
httpserver . Errorf ( w , r , "cannot get tail results for query [%q]: %s" , q , err )
return
}
2024-06-27 13:05:57 +00:00
if len ( resultRows ) > 0 {
WriteJSONRows ( w , resultRows )
flusher . Flush ( )
}
2024-06-27 12:18:42 +00:00
select {
case <- doneCh :
return
case <- ticker . C :
}
}
}
var liveTailRequests = metrics . NewCounter ( ` vl_live_tailing_requests ` )
const tailOffsetNsecs = 5e9
type logRow struct {
timestamp int64
fields [ ] logstorage . Field
}
func sortLogRows ( rows [ ] logRow ) {
sort . Slice ( rows , func ( i , j int ) bool {
return rows [ i ] . timestamp < rows [ j ] . timestamp
} )
}
type tailProcessor struct {
cancel func ( )
mu sync . Mutex
perStreamRows map [ string ] [ ] logRow
lastTimestamps map [ string ] int64
err error
}
func newTailProcessor ( cancel func ( ) ) * tailProcessor {
return & tailProcessor {
cancel : cancel ,
perStreamRows : make ( map [ string ] [ ] logRow ) ,
lastTimestamps : make ( map [ string ] int64 ) ,
}
}
func ( tp * tailProcessor ) writeBlock ( _ uint , timestamps [ ] int64 , columns [ ] logstorage . BlockColumn ) {
if len ( timestamps ) == 0 {
return
}
tp . mu . Lock ( )
defer tp . mu . Unlock ( )
if tp . err != nil {
return
}
2024-06-27 13:05:57 +00:00
// Make sure columns contain _time field, since it is needed for proper tail work.
2024-06-27 12:18:42 +00:00
hasTime := false
for _ , c := range columns {
if c . Name == "_time" {
hasTime = true
2024-06-27 13:05:57 +00:00
break
2024-06-27 12:18:42 +00:00
}
}
if ! hasTime {
tp . err = fmt . Errorf ( "missing _time field" )
tp . cancel ( )
return
}
// Copy block rows to tp.perStreamRows
for i , timestamp := range timestamps {
streamID := ""
fields := make ( [ ] logstorage . Field , len ( columns ) )
for j , c := range columns {
name := strings . Clone ( c . Name )
value := strings . Clone ( c . Values [ i ] )
fields [ j ] = logstorage . Field {
Name : name ,
Value : value ,
}
if name == "_stream_id" {
streamID = value
}
}
2024-06-27 13:05:57 +00:00
2024-06-27 12:18:42 +00:00
tp . perStreamRows [ streamID ] = append ( tp . perStreamRows [ streamID ] , logRow {
timestamp : timestamp ,
fields : fields ,
} )
}
}
func ( tp * tailProcessor ) getTailRows ( ) ( [ ] [ ] logstorage . Field , error ) {
if tp . err != nil {
return nil , tp . err
}
var resultRows [ ] logRow
for streamID , rows := range tp . perStreamRows {
sortLogRows ( rows )
lastTimestamp , ok := tp . lastTimestamps [ streamID ]
if ok {
// Skip already written rows
2024-06-27 13:05:57 +00:00
for len ( rows ) > 0 && rows [ 0 ] . timestamp <= lastTimestamp {
rows = rows [ 1 : ]
2024-06-27 12:18:42 +00:00
}
}
2024-06-27 13:05:57 +00:00
if len ( rows ) > 0 {
resultRows = append ( resultRows , rows ... )
tp . lastTimestamps [ streamID ] = rows [ len ( rows ) - 1 ] . timestamp
}
2024-06-27 12:18:42 +00:00
}
clear ( tp . perStreamRows )
sortLogRows ( resultRows )
tailRows := make ( [ ] [ ] logstorage . Field , len ( resultRows ) )
for i , row := range resultRows {
tailRows [ i ] = row . fields
}
return tailRows , nil
}
2024-05-20 02:08:30 +00:00
// ProcessQueryRequest handles /select/logsql/query request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#http-api
func ProcessQueryRequest ( ctx context . Context , w http . ResponseWriter , r * http . Request ) {
q , tenantIDs , err := parseCommonArgs ( r )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
// Parse limit query arg
limit , err := httputils . GetInt ( r , "limit" )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
2024-06-03 22:59:25 +00:00
bw := getBufferedWriter ( w )
defer func ( ) {
bw . FlushIgnoreErrors ( )
putBufferedWriter ( bw )
} ( )
w . Header ( ) . Set ( "Content-Type" , "application/stream+json" )
2024-05-20 02:08:30 +00:00
if limit > 0 {
2024-06-03 22:59:25 +00:00
if q . CanReturnLastNResults ( ) {
rows , err := getLastNQueryResults ( ctx , tenantIDs , q , limit )
if err != nil {
httpserver . Errorf ( w , r , "%s" , err )
return
}
bb := blockResultPool . Get ( )
b := bb . B
for i := range rows {
b = logstorage . MarshalFieldsToJSON ( b [ : 0 ] , rows [ i ] . fields )
b = append ( b , '\n' )
bw . WriteIgnoreErrors ( b )
}
bb . B = b
blockResultPool . Put ( bb )
return
}
2024-05-20 02:08:30 +00:00
q . AddPipeLimit ( uint64 ( limit ) )
}
2024-06-11 15:50:32 +00:00
q . Optimize ( )
2024-05-12 14:33:29 +00:00
writeBlock := func ( _ uint , timestamps [ ] int64 , columns [ ] logstorage . BlockColumn ) {
2024-05-20 02:08:30 +00:00
if len ( columns ) == 0 || len ( columns [ 0 ] . Values ) == 0 {
2024-02-18 21:01:34 +00:00
return
2023-06-20 05:55:12 +00:00
}
2024-02-18 21:01:34 +00:00
2023-06-20 05:55:12 +00:00
bb := blockResultPool . Get ( )
2024-05-12 14:33:29 +00:00
for i := range timestamps {
WriteJSONRow ( bb , columns , i )
2023-06-20 05:55:12 +00:00
}
2024-05-14 01:05:03 +00:00
bw . WriteIgnoreErrors ( bb . B )
2024-02-18 21:01:34 +00:00
blockResultPool . Put ( bb )
2024-05-12 14:33:29 +00:00
}
2024-06-03 22:59:25 +00:00
if err := vlstorage . RunQuery ( ctx , tenantIDs , q , writeBlock ) ; err != nil {
httpserver . Errorf ( w , r , "cannot execute query [%s]: %s" , q , err )
2024-06-27 12:18:42 +00:00
return
2024-06-03 22:59:25 +00:00
}
}
2024-05-12 14:33:29 +00:00
2024-06-03 22:59:25 +00:00
var blockResultPool bytesutil . ByteBufferPool
2024-05-12 14:33:29 +00:00
2024-06-03 22:59:25 +00:00
type row struct {
timestamp int64
fields [ ] logstorage . Field
}
func getLastNQueryResults ( ctx context . Context , tenantIDs [ ] logstorage . TenantID , q * logstorage . Query , limit int ) ( [ ] row , error ) {
2024-06-05 01:18:12 +00:00
limitUpper := 2 * limit
q . AddPipeLimit ( uint64 ( limitUpper ) )
2024-06-03 22:59:25 +00:00
q . Optimize ( )
2024-06-05 01:18:12 +00:00
rows , err := getQueryResultsWithLimit ( ctx , tenantIDs , q , limitUpper )
2024-05-12 14:33:29 +00:00
if err != nil {
2024-06-03 22:59:25 +00:00
return nil , err
}
2024-06-05 01:18:12 +00:00
if len ( rows ) < limitUpper {
// Fast path - the requested time range contains up to limitUpper rows.
rows = getLastNRows ( rows , limit )
2024-06-03 22:59:25 +00:00
return rows , nil
}
2024-06-05 01:18:12 +00:00
// Slow path - search for the time range containing up to limitUpper rows.
2024-06-03 22:59:25 +00:00
start , end := q . GetFilterTimeRange ( )
d := end / 2 - start / 2
start += d
qOrig := q
for {
q = qOrig . Clone ( )
q . AddTimeFilter ( start , end )
2024-06-05 01:18:12 +00:00
rows , err := getQueryResultsWithLimit ( ctx , tenantIDs , q , limitUpper )
2024-06-03 22:59:25 +00:00
if err != nil {
return nil , err
}
2024-06-05 01:18:12 +00:00
if len ( rows ) >= limit && len ( rows ) < limitUpper || d == 0 {
rows = getLastNRows ( rows , limit )
2024-06-03 22:59:25 +00:00
return rows , nil
}
lastBit := d & 1
d /= 2
if len ( rows ) > limit {
start += d
} else {
start -= d + lastBit
}
2024-05-12 14:33:29 +00:00
}
2023-06-20 05:55:12 +00:00
}
2024-06-05 01:18:12 +00:00
func getLastNRows ( rows [ ] row , limit int ) [ ] row {
2024-06-03 22:59:25 +00:00
sort . Slice ( rows , func ( i , j int ) bool {
return rows [ i ] . timestamp < rows [ j ] . timestamp
} )
2024-06-05 01:18:12 +00:00
if len ( rows ) > limit {
rows = rows [ len ( rows ) - limit : ]
}
return rows
2024-06-03 22:59:25 +00:00
}
func getQueryResultsWithLimit ( ctx context . Context , tenantIDs [ ] logstorage . TenantID , q * logstorage . Query , limit int ) ( [ ] row , error ) {
ctxWithCancel , cancel := context . WithCancel ( ctx )
defer cancel ( )
var rows [ ] row
var rowsLock sync . Mutex
writeBlock := func ( _ uint , timestamps [ ] int64 , columns [ ] logstorage . BlockColumn ) {
rowsLock . Lock ( )
defer rowsLock . Unlock ( )
for i , timestamp := range timestamps {
fields := make ( [ ] logstorage . Field , len ( columns ) )
for j := range columns {
f := & fields [ j ]
f . Name = strings . Clone ( columns [ j ] . Name )
f . Value = strings . Clone ( columns [ j ] . Values [ i ] )
}
rows = append ( rows , row {
timestamp : timestamp ,
fields : fields ,
} )
}
if len ( rows ) >= limit {
cancel ( )
}
}
if err := vlstorage . RunQuery ( ctxWithCancel , tenantIDs , q , writeBlock ) ; err != nil {
return nil , err
}
return rows , nil
}
2024-05-15 02:55:44 +00:00
2024-05-20 02:08:30 +00:00
func parseCommonArgs ( r * http . Request ) ( * logstorage . Query , [ ] logstorage . TenantID , error ) {
// Extract tenantID
tenantID , err := logstorage . GetTenantIDFromRequest ( r )
if err != nil {
return nil , nil , fmt . Errorf ( "cannot obtain tenanID: %w" , err )
}
tenantIDs := [ ] logstorage . TenantID { tenantID }
// Parse query
qStr := r . FormValue ( "query" )
q , err := logstorage . ParseQuery ( qStr )
if err != nil {
return nil , nil , fmt . Errorf ( "cannot parse query [%s]: %s" , qStr , err )
}
// Parse optional start and end args
start , okStart , err := getTimeNsec ( r , "start" )
if err != nil {
return nil , nil , err
}
end , okEnd , err := getTimeNsec ( r , "end" )
if err != nil {
return nil , nil , err
}
if okStart || okEnd {
if ! okStart {
start = math . MinInt64
}
if ! okEnd {
end = math . MaxInt64
}
q . AddTimeFilter ( start , end )
}
return q , tenantIDs , nil
}
2024-05-15 02:55:44 +00:00
func getTimeNsec ( r * http . Request , argName string ) ( int64 , bool , error ) {
s := r . FormValue ( argName )
if s == "" {
return 0 , false , nil
}
2024-06-03 22:59:25 +00:00
currentTimestamp := time . Now ( ) . UnixNano ( )
nsecs , err := promutils . ParseTimeAt ( s , currentTimestamp )
2024-05-15 02:55:44 +00:00
if err != nil {
return 0 , false , fmt . Errorf ( "cannot parse %s=%s: %w" , argName , s , err )
}
2024-06-03 22:59:25 +00:00
return nsecs , true , nil
2024-05-15 02:55:44 +00:00
}