VictoriaMetrics/app/vlselect/logsql/logsql.go

185 lines
4.6 KiB
Go
Raw Normal View History

package logsql
import (
2024-04-26 21:47:50 +00:00
"context"
2024-05-15 02:42:03 +00:00
"fmt"
"math"
"net/http"
2024-05-18 21:07:51 +00:00
"slices"
2024-05-15 02:42:03 +00:00
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
2024-05-15 02:42:03 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
)
2024-05-18 21:07:51 +00:00
// ProcessFieldNamesRequest handles /select/logsql/field_names request.
func ProcessFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
q, tenantIDs, err := parseCommonArgs(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
// Obtain field names for the given query
q.Optimize()
fieldNames, err := vlstorage.GetFieldNames(ctx, tenantIDs, q)
if err != nil {
httpserver.Errorf(w, r, "cannot obtain field names: %w", err)
return
}
slices.Sort(fieldNames)
// Write results
w.Header().Set("Content-Type", "application/json")
WriteFieldNamesResponse(w, fieldNames)
}
2024-05-18 17:40:23 +00:00
// ProcessFieldValuesRequest handles /select/logsql/field_values request.
func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
q, tenantIDs, err := parseCommonArgs(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
2024-05-18 17:40:23 +00:00
// Parse fieldName query arg
fieldName := r.FormValue("field_name")
if fieldName == "" {
httpserver.Errorf(w, r, "missing 'field_name' query arg")
return
}
2024-05-18 17:40:23 +00:00
// Parse limit query arg
limit, err := httputils.GetInt(r, "limit")
2024-05-15 02:42:03 +00:00
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
2024-05-18 17:40:23 +00:00
if limit < 0 {
limit = 0
}
// Obtain unique values for the given field
q.Optimize()
2024-05-18 21:07:51 +00:00
values, err := vlstorage.GetFieldValues(ctx, tenantIDs, q, fieldName, uint64(limit))
2024-05-15 02:42:03 +00:00
if err != nil {
2024-05-18 17:40:23 +00:00
httpserver.Errorf(w, r, "cannot obtain values for field %q: %s", fieldName, err)
2024-05-15 02:42:03 +00:00
return
}
2024-05-18 17:40:23 +00:00
2024-05-18 21:07:51 +00:00
if limit == 0 || len(values) < limit {
// Sort values only if their number is below the limit.
// Otherwise there is little sense in sorting, since the query may return
// different subset of values on every execution.
slices.Sort(values)
}
2024-05-18 17:40:23 +00:00
// Write results
w.Header().Set("Content-Type", "application/json")
WriteFieldValuesResponse(w, values)
}
// ProcessQueryRequest handles /select/logsql/query request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#http-api
func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
q, tenantIDs, err := parseCommonArgs(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
2024-05-15 02:42:03 +00:00
}
// Parse limit query arg
limit, err := httputils.GetInt(r, "limit")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
2024-05-14 01:04:11 +00:00
if limit > 0 {
q.AddPipeLimit(uint64(limit))
}
2024-04-26 21:47:50 +00:00
2024-05-14 01:04:11 +00:00
bw := getBufferedWriter(w)
2024-04-26 21:47:50 +00:00
writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
if len(columns) == 0 {
return
}
bb := blockResultPool.Get()
2024-04-26 21:47:50 +00:00
for i := range timestamps {
WriteJSONRow(bb, columns, i)
}
2024-05-14 01:04:11 +00:00
bw.WriteIgnoreErrors(bb.B)
blockResultPool.Put(bb)
2024-04-26 21:47:50 +00:00
}
2024-05-18 17:40:23 +00:00
w.Header().Set("Content-Type", "application/stream+json")
q.Optimize()
2024-05-14 01:04:11 +00:00
err = vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock)
2024-04-26 21:47:50 +00:00
2024-05-14 01:04:11 +00:00
bw.FlushIgnoreErrors()
putBufferedWriter(bw)
2024-04-27 20:08:03 +00:00
if err != nil {
2024-05-18 17:40:23 +00:00
httpserver.Errorf(w, r, "cannot execute query [%s]: %s", q, err)
2024-04-27 20:08:03 +00:00
}
}
var blockResultPool bytesutil.ByteBufferPool
2024-05-15 02:42:03 +00:00
2024-05-18 17:40:23 +00:00
func parseCommonArgs(r *http.Request) (*logstorage.Query, []logstorage.TenantID, error) {
// Extract tenantID
tenantID, err := logstorage.GetTenantIDFromRequest(r)
if err != nil {
return nil, nil, fmt.Errorf("cannot obtain tenanID: %w", err)
}
tenantIDs := []logstorage.TenantID{tenantID}
// Parse query
qStr := r.FormValue("query")
q, err := logstorage.ParseQuery(qStr)
if err != nil {
return nil, nil, fmt.Errorf("cannot parse query [%s]: %s", qStr, err)
}
// Parse optional start and end args
start, okStart, err := getTimeNsec(r, "start")
if err != nil {
return nil, nil, err
}
end, okEnd, err := getTimeNsec(r, "end")
if err != nil {
return nil, nil, err
}
if okStart || okEnd {
if !okStart {
start = math.MinInt64
}
if !okEnd {
end = math.MaxInt64
}
q.AddTimeFilter(start, end)
}
return q, tenantIDs, nil
}
2024-05-15 02:42:03 +00:00
func getTimeNsec(r *http.Request, argName string) (int64, bool, error) {
s := r.FormValue(argName)
if s == "" {
return 0, false, nil
}
currentTimestamp := float64(time.Now().UnixNano()) / 1e9
secs, err := promutils.ParseTimeAt(s, currentTimestamp)
if err != nil {
return 0, false, fmt.Errorf("cannot parse %s=%s: %w", argName, s, err)
}
return int64(secs * 1e9), true, nil
}