mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vlselect/logsql: sort query results by _time if their summary size doesnt exceed -select.maxSortBufferSize
This commit is contained in:
parent
94f516df43
commit
7346bb4f03
10 changed files with 586 additions and 123 deletions
|
@ -9,7 +9,6 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
||||||
|
@ -19,11 +18,11 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
"github.com/valyala/fastjson"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -240,28 +239,21 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
|
||||||
return false, fmt.Errorf(`missing log message after the "create" or "index" command`)
|
return false, fmt.Errorf(`missing log message after the "create" or "index" command`)
|
||||||
}
|
}
|
||||||
line = sc.Bytes()
|
line = sc.Bytes()
|
||||||
pctx := getParserCtx()
|
p := logjson.GetParser()
|
||||||
if err := pctx.parseLogMessage(line); err != nil {
|
if err := p.ParseLogMessage(line); err != nil {
|
||||||
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
|
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
timestamp, err := extractTimestampFromFields(timeField, pctx.fields)
|
timestamp, err := extractTimestampFromFields(timeField, p.Fields)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("cannot parse timestamp: %w", err)
|
return false, fmt.Errorf("cannot parse timestamp: %w", err)
|
||||||
}
|
}
|
||||||
updateMessageFieldName(msgField, pctx.fields)
|
updateMessageFieldName(msgField, p.Fields)
|
||||||
processLogMessage(timestamp, pctx.fields)
|
processLogMessage(timestamp, p.Fields)
|
||||||
putParserCtx(pctx)
|
logjson.PutParser(p)
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var parserPool fastjson.ParserPool
|
|
||||||
|
|
||||||
var (
|
|
||||||
invalidTimestampLogger = logger.WithThrottler("invalidTimestampLogger", 5*time.Second)
|
|
||||||
invalidJSONLineLogger = logger.WithThrottler("invalidJSONLineLogger", 5*time.Second)
|
|
||||||
)
|
|
||||||
|
|
||||||
func extractTimestampFromFields(timeField string, fields []logstorage.Field) (int64, error) {
|
func extractTimestampFromFields(timeField string, fields []logstorage.Field) (int64, error) {
|
||||||
for i := range fields {
|
for i := range fields {
|
||||||
f := &fields[i]
|
f := &fields[i]
|
||||||
|
@ -291,102 +283,6 @@ func updateMessageFieldName(msgField string, fields []logstorage.Field) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type parserCtx struct {
|
|
||||||
p fastjson.Parser
|
|
||||||
buf []byte
|
|
||||||
prefixBuf []byte
|
|
||||||
fields []logstorage.Field
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pctx *parserCtx) reset() {
|
|
||||||
pctx.buf = pctx.buf[:0]
|
|
||||||
pctx.prefixBuf = pctx.prefixBuf[:0]
|
|
||||||
|
|
||||||
fields := pctx.fields
|
|
||||||
for i := range fields {
|
|
||||||
lf := &fields[i]
|
|
||||||
lf.Name = ""
|
|
||||||
lf.Value = ""
|
|
||||||
}
|
|
||||||
pctx.fields = fields[:0]
|
|
||||||
}
|
|
||||||
|
|
||||||
func getParserCtx() *parserCtx {
|
|
||||||
v := parserCtxPool.Get()
|
|
||||||
if v == nil {
|
|
||||||
return &parserCtx{}
|
|
||||||
}
|
|
||||||
return v.(*parserCtx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func putParserCtx(pctx *parserCtx) {
|
|
||||||
pctx.reset()
|
|
||||||
parserCtxPool.Put(pctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
var parserCtxPool sync.Pool
|
|
||||||
|
|
||||||
func (pctx *parserCtx) parseLogMessage(msg []byte) error {
|
|
||||||
s := bytesutil.ToUnsafeString(msg)
|
|
||||||
v, err := pctx.p.Parse(s)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("cannot parse json: %w", err)
|
|
||||||
}
|
|
||||||
if t := v.Type(); t != fastjson.TypeObject {
|
|
||||||
return fmt.Errorf("expecting json dictionary; got %s", t)
|
|
||||||
}
|
|
||||||
pctx.reset()
|
|
||||||
pctx.fields, pctx.buf, pctx.prefixBuf = appendLogFields(pctx.fields, pctx.buf, pctx.prefixBuf, v)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func appendLogFields(dst []logstorage.Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]logstorage.Field, []byte, []byte) {
|
|
||||||
o := v.GetObject()
|
|
||||||
o.Visit(func(k []byte, v *fastjson.Value) {
|
|
||||||
t := v.Type()
|
|
||||||
switch t {
|
|
||||||
case fastjson.TypeNull:
|
|
||||||
// Skip nulls
|
|
||||||
case fastjson.TypeObject:
|
|
||||||
// Flatten nested JSON objects.
|
|
||||||
// For example, {"foo":{"bar":"baz"}} is converted to {"foo.bar":"baz"}
|
|
||||||
prefixLen := len(prefixBuf)
|
|
||||||
prefixBuf = append(prefixBuf, k...)
|
|
||||||
prefixBuf = append(prefixBuf, '.')
|
|
||||||
dst, dstBuf, prefixBuf = appendLogFields(dst, dstBuf, prefixBuf, v)
|
|
||||||
prefixBuf = prefixBuf[:prefixLen]
|
|
||||||
case fastjson.TypeArray, fastjson.TypeNumber, fastjson.TypeTrue, fastjson.TypeFalse:
|
|
||||||
// Convert JSON arrays, numbers, true and false values to their string representation
|
|
||||||
dstBufLen := len(dstBuf)
|
|
||||||
dstBuf = v.MarshalTo(dstBuf)
|
|
||||||
value := dstBuf[dstBufLen:]
|
|
||||||
dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value)
|
|
||||||
case fastjson.TypeString:
|
|
||||||
// Decode JSON strings
|
|
||||||
dstBufLen := len(dstBuf)
|
|
||||||
dstBuf = append(dstBuf, v.GetStringBytes()...)
|
|
||||||
value := dstBuf[dstBufLen:]
|
|
||||||
dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value)
|
|
||||||
default:
|
|
||||||
logger.Panicf("BUG: unexpected JSON type: %s", t)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
return dst, dstBuf, prefixBuf
|
|
||||||
}
|
|
||||||
|
|
||||||
func appendLogField(dst []logstorage.Field, dstBuf, prefixBuf, k, value []byte) ([]logstorage.Field, []byte) {
|
|
||||||
dstBufLen := len(dstBuf)
|
|
||||||
dstBuf = append(dstBuf, prefixBuf...)
|
|
||||||
dstBuf = append(dstBuf, k...)
|
|
||||||
name := dstBuf[dstBufLen:]
|
|
||||||
|
|
||||||
dst = append(dst, logstorage.Field{
|
|
||||||
Name: bytesutil.ToUnsafeString(name),
|
|
||||||
Value: bytesutil.ToUnsafeString(value),
|
|
||||||
})
|
|
||||||
return dst, dstBuf
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseElasticsearchTimestamp(s string) (int64, error) {
|
func parseElasticsearchTimestamp(s string) (int64, error) {
|
||||||
if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' {
|
if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' {
|
||||||
// Try parsing timestamp in milliseconds
|
// Try parsing timestamp in milliseconds
|
||||||
|
|
|
@ -4,12 +4,18 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
maxSortBufferSize = flagutil.NewBytes("select.maxSortBufferSize", 1024*1024, "Query results from /select/logsql/query are automatically sorted by _time "+
|
||||||
|
"if their summary size doesn't exceed this value; otherwise query results are streamed in the response without sorting; "+
|
||||||
|
"too big value for this flag may result in high memory usage, since the sorting is performed in memory")
|
||||||
|
)
|
||||||
|
|
||||||
// ProcessQueryRequest handles /select/logsql/query request
|
// ProcessQueryRequest handles /select/logsql/query request
|
||||||
func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan struct{}) {
|
func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan struct{}) {
|
||||||
// Extract tenantID
|
// Extract tenantID
|
||||||
|
@ -27,9 +33,8 @@ func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan s
|
||||||
}
|
}
|
||||||
w.Header().Set("Content-Type", "application/stream+json; charset=utf-8")
|
w.Header().Set("Content-Type", "application/stream+json; charset=utf-8")
|
||||||
|
|
||||||
bw := bufferedwriter.Get(w)
|
sw := getSortWriter()
|
||||||
defer bufferedwriter.Put(bw)
|
sw.Init(w, maxSortBufferSize.IntN())
|
||||||
|
|
||||||
tenantIDs := []logstorage.TenantID{tenantID}
|
tenantIDs := []logstorage.TenantID{tenantID}
|
||||||
vlstorage.RunQuery(tenantIDs, q, stopCh, func(columns []logstorage.BlockColumn) {
|
vlstorage.RunQuery(tenantIDs, q, stopCh, func(columns []logstorage.BlockColumn) {
|
||||||
if len(columns) == 0 {
|
if len(columns) == 0 {
|
||||||
|
@ -41,13 +46,11 @@ func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan s
|
||||||
for rowIdx := 0; rowIdx < rowsCount; rowIdx++ {
|
for rowIdx := 0; rowIdx < rowsCount; rowIdx++ {
|
||||||
WriteJSONRow(bb, columns, rowIdx)
|
WriteJSONRow(bb, columns, rowIdx)
|
||||||
}
|
}
|
||||||
// Do not check for error here, since the only valid error is when the client
|
sw.MustWrite(bb.B)
|
||||||
// closes the connection during Write() call. There is no need in logging this error,
|
|
||||||
// since it may be too verbose and it doesn't give any actionable info.
|
|
||||||
_, _ = bw.Write(bb.B)
|
|
||||||
blockResultPool.Put(bb)
|
blockResultPool.Put(bb)
|
||||||
})
|
})
|
||||||
_ = bw.Flush()
|
sw.FinalFlush()
|
||||||
|
putSortWriter(sw)
|
||||||
}
|
}
|
||||||
|
|
||||||
var blockResultPool bytesutil.ByteBufferPool
|
var blockResultPool bytesutil.ByteBufferPool
|
||||||
|
|
|
@ -17,4 +17,25 @@
|
||||||
}{% newline %}
|
}{% newline %}
|
||||||
{% endfunc %}
|
{% endfunc %}
|
||||||
|
|
||||||
|
// JSONRows prints formatted rows
|
||||||
|
{% func JSONRows(rows [][]logstorage.Field) %}
|
||||||
|
{% if len(rows) == 0 %}
|
||||||
|
{% return %}
|
||||||
|
{% endif %}
|
||||||
|
{% for _, fields := range rows %}
|
||||||
|
{
|
||||||
|
{% if len(fields) > 0 %}
|
||||||
|
{% code
|
||||||
|
f := fields[0]
|
||||||
|
fields = fields[1:]
|
||||||
|
%}
|
||||||
|
{%q= f.Name %}:{%q= f.Value %}
|
||||||
|
{% for _, f := range fields %}
|
||||||
|
,{%q= f.Name %}:{%q= f.Value %}
|
||||||
|
{% endfor %}
|
||||||
|
{% endif %}
|
||||||
|
}{% newline %}
|
||||||
|
{% endfor %}
|
||||||
|
{% endfunc %}
|
||||||
|
|
||||||
{% endstripspace %}
|
{% endstripspace %}
|
||||||
|
|
|
@ -88,3 +88,79 @@ func JSONRow(columns []logstorage.BlockColumn, rowIdx int) string {
|
||||||
return qs422016
|
return qs422016
|
||||||
//line app/vlselect/logsql/query_response.qtpl:18
|
//line app/vlselect/logsql/query_response.qtpl:18
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// JSONRows prints formatted rows
|
||||||
|
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:21
|
||||||
|
func StreamJSONRows(qw422016 *qt422016.Writer, rows [][]logstorage.Field) {
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:22
|
||||||
|
if len(rows) == 0 {
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:23
|
||||||
|
return
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:24
|
||||||
|
}
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:25
|
||||||
|
for _, fields := range rows {
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:25
|
||||||
|
qw422016.N().S(`{`)
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:27
|
||||||
|
if len(fields) > 0 {
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:29
|
||||||
|
f := fields[0]
|
||||||
|
fields = fields[1:]
|
||||||
|
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:32
|
||||||
|
qw422016.N().Q(f.Name)
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:32
|
||||||
|
qw422016.N().S(`:`)
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:32
|
||||||
|
qw422016.N().Q(f.Value)
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:33
|
||||||
|
for _, f := range fields {
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:33
|
||||||
|
qw422016.N().S(`,`)
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:34
|
||||||
|
qw422016.N().Q(f.Name)
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:34
|
||||||
|
qw422016.N().S(`:`)
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:34
|
||||||
|
qw422016.N().Q(f.Value)
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:35
|
||||||
|
}
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:36
|
||||||
|
}
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:36
|
||||||
|
qw422016.N().S(`}`)
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:37
|
||||||
|
qw422016.N().S(`
|
||||||
|
`)
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:38
|
||||||
|
}
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:39
|
||||||
|
}
|
||||||
|
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:39
|
||||||
|
func WriteJSONRows(qq422016 qtio422016.Writer, rows [][]logstorage.Field) {
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:39
|
||||||
|
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:39
|
||||||
|
StreamJSONRows(qw422016, rows)
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:39
|
||||||
|
qt422016.ReleaseWriter(qw422016)
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:39
|
||||||
|
}
|
||||||
|
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:39
|
||||||
|
func JSONRows(rows [][]logstorage.Field) string {
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:39
|
||||||
|
qb422016 := qt422016.AcquireByteBuffer()
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:39
|
||||||
|
WriteJSONRows(qb422016, rows)
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:39
|
||||||
|
qs422016 := string(qb422016.B)
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:39
|
||||||
|
qt422016.ReleaseByteBuffer(qb422016)
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:39
|
||||||
|
return qs422016
|
||||||
|
//line app/vlselect/logsql/query_response.qtpl:39
|
||||||
|
}
|
||||||
|
|
222
app/vlselect/logsql/sort_writer.go
Normal file
222
app/vlselect/logsql/sort_writer.go
Normal file
|
@ -0,0 +1,222 @@
|
||||||
|
package logsql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
|
)
|
||||||
|
|
||||||
|
func getSortWriter() *sortWriter {
|
||||||
|
v := sortWriterPool.Get()
|
||||||
|
if v == nil {
|
||||||
|
return &sortWriter{}
|
||||||
|
}
|
||||||
|
return v.(*sortWriter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func putSortWriter(sw *sortWriter) {
|
||||||
|
sw.reset()
|
||||||
|
sortWriterPool.Put(sw)
|
||||||
|
}
|
||||||
|
|
||||||
|
var sortWriterPool sync.Pool
|
||||||
|
|
||||||
|
// sortWriter expects JSON line stream to be written to it.
|
||||||
|
//
|
||||||
|
// It buffers the incoming data until its size reaches maxBufLen.
|
||||||
|
// Then it streams the buffered data and all the incoming data to w.
|
||||||
|
//
|
||||||
|
// The FinalFlush() must be called when all the data is written.
|
||||||
|
// If the buf isn't empty at FinalFlush() call, then the buffered data
|
||||||
|
// is sorted by _time field.
|
||||||
|
type sortWriter struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
w io.Writer
|
||||||
|
maxBufLen int
|
||||||
|
buf []byte
|
||||||
|
bufFlushed bool
|
||||||
|
|
||||||
|
hasErr bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sw *sortWriter) reset() {
|
||||||
|
sw.w = nil
|
||||||
|
sw.maxBufLen = 0
|
||||||
|
sw.buf = sw.buf[:0]
|
||||||
|
sw.bufFlushed = false
|
||||||
|
sw.hasErr = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sw *sortWriter) Init(w io.Writer, maxBufLen int) {
|
||||||
|
sw.reset()
|
||||||
|
|
||||||
|
sw.w = w
|
||||||
|
sw.maxBufLen = maxBufLen
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sw *sortWriter) MustWrite(p []byte) {
|
||||||
|
sw.mu.Lock()
|
||||||
|
defer sw.mu.Unlock()
|
||||||
|
|
||||||
|
if sw.hasErr {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if sw.bufFlushed {
|
||||||
|
if _, err := sw.w.Write(p); err != nil {
|
||||||
|
sw.hasErr = true
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(sw.buf)+len(p) < sw.maxBufLen {
|
||||||
|
sw.buf = append(sw.buf, p...)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sw.bufFlushed = true
|
||||||
|
if len(sw.buf) > 0 {
|
||||||
|
if _, err := sw.w.Write(sw.buf); err != nil {
|
||||||
|
sw.hasErr = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sw.buf = sw.buf[:0]
|
||||||
|
}
|
||||||
|
if _, err := sw.w.Write(p); err != nil {
|
||||||
|
sw.hasErr = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sw *sortWriter) FinalFlush() {
|
||||||
|
if sw.hasErr || sw.bufFlushed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rs := getRowsSorter()
|
||||||
|
rs.parseRows(sw.buf)
|
||||||
|
rs.sort()
|
||||||
|
WriteJSONRows(sw.w, rs.rows)
|
||||||
|
putRowsSorter(rs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getRowsSorter() *rowsSorter {
|
||||||
|
v := rowsSorterPool.Get()
|
||||||
|
if v == nil {
|
||||||
|
return &rowsSorter{}
|
||||||
|
}
|
||||||
|
return v.(*rowsSorter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func putRowsSorter(rs *rowsSorter) {
|
||||||
|
rs.reset()
|
||||||
|
rowsSorterPool.Put(rs)
|
||||||
|
}
|
||||||
|
|
||||||
|
var rowsSorterPool sync.Pool
|
||||||
|
|
||||||
|
type rowsSorter struct {
|
||||||
|
buf []byte
|
||||||
|
fieldsBuf []logstorage.Field
|
||||||
|
rows [][]logstorage.Field
|
||||||
|
times []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *rowsSorter) reset() {
|
||||||
|
rs.buf = rs.buf[:0]
|
||||||
|
|
||||||
|
fieldsBuf := rs.fieldsBuf
|
||||||
|
for i := range fieldsBuf {
|
||||||
|
fieldsBuf[i].Reset()
|
||||||
|
}
|
||||||
|
rs.fieldsBuf = fieldsBuf[:0]
|
||||||
|
|
||||||
|
rows := rs.rows
|
||||||
|
for i := range rows {
|
||||||
|
rows[i] = nil
|
||||||
|
}
|
||||||
|
rs.rows = rows[:0]
|
||||||
|
|
||||||
|
times := rs.times
|
||||||
|
for i := range times {
|
||||||
|
times[i] = ""
|
||||||
|
}
|
||||||
|
rs.times = times[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *rowsSorter) parseRows(src []byte) {
|
||||||
|
rs.reset()
|
||||||
|
|
||||||
|
buf := rs.buf
|
||||||
|
fieldsBuf := rs.fieldsBuf
|
||||||
|
rows := rs.rows
|
||||||
|
times := rs.times
|
||||||
|
|
||||||
|
p := logjson.GetParser()
|
||||||
|
for len(src) > 0 {
|
||||||
|
var line []byte
|
||||||
|
n := bytes.IndexByte(src, '\n')
|
||||||
|
if n < 0 {
|
||||||
|
line = src
|
||||||
|
src = nil
|
||||||
|
} else {
|
||||||
|
line = src[:n]
|
||||||
|
src = src[n+1:]
|
||||||
|
}
|
||||||
|
if len(line) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
p.ParseLogMessage(line)
|
||||||
|
|
||||||
|
timeValue := ""
|
||||||
|
fieldsBufLen := len(fieldsBuf)
|
||||||
|
for _, f := range p.Fields {
|
||||||
|
bufLen := len(buf)
|
||||||
|
buf = append(buf, f.Name...)
|
||||||
|
name := bytesutil.ToUnsafeString(buf[bufLen:])
|
||||||
|
|
||||||
|
bufLen = len(buf)
|
||||||
|
buf = append(buf, f.Value...)
|
||||||
|
value := bytesutil.ToUnsafeString(buf[bufLen:])
|
||||||
|
|
||||||
|
fieldsBuf = append(fieldsBuf, logstorage.Field{
|
||||||
|
Name: name,
|
||||||
|
Value: value,
|
||||||
|
})
|
||||||
|
|
||||||
|
if name == "_time" {
|
||||||
|
timeValue = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rows = append(rows, fieldsBuf[fieldsBufLen:])
|
||||||
|
times = append(times, timeValue)
|
||||||
|
}
|
||||||
|
logjson.PutParser(p)
|
||||||
|
|
||||||
|
rs.buf = buf
|
||||||
|
rs.fieldsBuf = fieldsBuf
|
||||||
|
rs.rows = rows
|
||||||
|
rs.times = times
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *rowsSorter) Len() int {
|
||||||
|
return len(rs.rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *rowsSorter) Less(i, j int) bool {
|
||||||
|
times := rs.times
|
||||||
|
return times[i] < times[j]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *rowsSorter) Swap(i, j int) {
|
||||||
|
times := rs.times
|
||||||
|
rows := rs.rows
|
||||||
|
times[i], times[j] = times[j], times[i]
|
||||||
|
rows[i], rows[j] = rows[j], rows[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *rowsSorter) sort() {
|
||||||
|
sort.Sort(rs)
|
||||||
|
}
|
39
app/vlselect/logsql/sort_writer_test.go
Normal file
39
app/vlselect/logsql/sort_writer_test.go
Normal file
|
@ -0,0 +1,39 @@
|
||||||
|
package logsql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSortWriter(t *testing.T) {
|
||||||
|
f := func(maxBufLen int, data string, expectedResult string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
var bb bytes.Buffer
|
||||||
|
sw := getSortWriter()
|
||||||
|
sw.Init(&bb, maxBufLen)
|
||||||
|
|
||||||
|
for _, s := range strings.Split(data, "\n") {
|
||||||
|
sw.MustWrite([]byte(s + "\n"))
|
||||||
|
}
|
||||||
|
sw.FinalFlush()
|
||||||
|
putSortWriter(sw)
|
||||||
|
|
||||||
|
result := bb.String()
|
||||||
|
if result != expectedResult {
|
||||||
|
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, expectedResult)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
f(100, "", "")
|
||||||
|
f(100, "{}", "{}\n")
|
||||||
|
|
||||||
|
data := `{"_time":"def","_msg":"xxx"}
|
||||||
|
{"_time":"abc","_msg":"foo"}`
|
||||||
|
resultExpected := `{"_time":"abc","_msg":"foo"}
|
||||||
|
{"_time":"def","_msg":"xxx"}
|
||||||
|
`
|
||||||
|
f(100, data, resultExpected)
|
||||||
|
f(10, data, data+"\n")
|
||||||
|
}
|
|
@ -1058,8 +1058,9 @@ See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) fo
|
||||||
|
|
||||||
## Sorting
|
## Sorting
|
||||||
|
|
||||||
By default VictoriaLogs doesn't sort the returned results because of performance and efficiency concerns
|
By default VictoriaLogs sorts the returned results by [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field)
|
||||||
described [here](https://docs.victoriametrics.com/VictoriaLogs/querying/#command-line).
|
if their total size doesn't exceed `-select.maxSortBufferSize` command-line value (by default it is set to one megabytes).
|
||||||
|
Otherwise sorting is skipped because of performance and efficiency concerns described [here](https://docs.victoriametrics.com/VictoriaLogs/querying/).
|
||||||
|
|
||||||
It is possible to sort the [selected log entries](#filters) at client side with `sort` Unix command
|
It is possible to sort the [selected log entries](#filters) at client side with `sort` Unix command
|
||||||
according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/#command-line).
|
according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/#command-line).
|
||||||
|
|
|
@ -31,7 +31,9 @@ The response can be interrupted at any time by closing the connection to Victori
|
||||||
This allows post-processing the returned lines at the client side with the usual Unix commands such as `grep`, `jq`, `less`, `head`, etc.
|
This allows post-processing the returned lines at the client side with the usual Unix commands such as `grep`, `jq`, `less`, `head`, etc.
|
||||||
See [these docs](#command-line) for more details.
|
See [these docs](#command-line) for more details.
|
||||||
|
|
||||||
The returned lines aren't sorted by default, since sorting disables the ability to send matching log entries to response stream as soon as they are found.
|
The returned lines are sorted by [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field)
|
||||||
|
if their total size doesn't exceed `-select.maxSortBufferSize` command-line flag value (by default it is set to one megabyte).
|
||||||
|
Otherwise the returned lines aren't sorted, since sorting disables the ability to send matching log entries to response stream as soon as they are found.
|
||||||
Query results can be sorted either at VictoriaLogs side according [to these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#sorting)
|
Query results can be sorted either at VictoriaLogs side according [to these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#sorting)
|
||||||
or at client side with the usual `sort` command according to [these docs](#command-line).
|
or at client side with the usual `sort` command according to [these docs](#command-line).
|
||||||
|
|
||||||
|
|
132
lib/logjson/parser.go
Normal file
132
lib/logjson/parser.go
Normal file
|
@ -0,0 +1,132 @@
|
||||||
|
package logjson
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
|
"github.com/valyala/fastjson"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Parser parses a single JSON log message into Fields.
|
||||||
|
//
|
||||||
|
// See https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model
|
||||||
|
//
|
||||||
|
// Use GetParser() for obtaining the parser.
|
||||||
|
type Parser struct {
|
||||||
|
// Fields contains the parsed JSON line after Parse() call
|
||||||
|
//
|
||||||
|
// The Fields are valid until the next call to ParseLogMessage()
|
||||||
|
// or until the parser is returned to the pool with PutParser() call.
|
||||||
|
Fields []logstorage.Field
|
||||||
|
|
||||||
|
// p is used for fast JSON parsing
|
||||||
|
p fastjson.Parser
|
||||||
|
|
||||||
|
// buf is used for holding the backing data for Fields
|
||||||
|
buf []byte
|
||||||
|
|
||||||
|
// prefixBuf is used for holding the current key prefix
|
||||||
|
// when it is composed from multiple keys.
|
||||||
|
prefixBuf []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Parser) reset() {
|
||||||
|
fields := p.Fields
|
||||||
|
for i := range fields {
|
||||||
|
lf := &fields[i]
|
||||||
|
lf.Name = ""
|
||||||
|
lf.Value = ""
|
||||||
|
}
|
||||||
|
p.Fields = fields[:0]
|
||||||
|
|
||||||
|
p.buf = p.buf[:0]
|
||||||
|
p.prefixBuf = p.prefixBuf[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetParser returns Parser ready to parse JSON lines.
|
||||||
|
//
|
||||||
|
// Return the parser to the pool when it is no longer needed by calling PutParser().
|
||||||
|
func GetParser() *Parser {
|
||||||
|
v := parserPool.Get()
|
||||||
|
if v == nil {
|
||||||
|
return &Parser{}
|
||||||
|
}
|
||||||
|
return v.(*Parser)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PutParser returns the parser to the pool.
|
||||||
|
//
|
||||||
|
// The parser cannot be used after returning to the pool.
|
||||||
|
func PutParser(p *Parser) {
|
||||||
|
p.reset()
|
||||||
|
parserPool.Put(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
var parserPool sync.Pool
|
||||||
|
|
||||||
|
// ParseLogMessage parses the given JSON log message msg into p.Fields.
|
||||||
|
//
|
||||||
|
// The p.Fields remains valid until the next call to ParseLogMessage() or PutParser().
|
||||||
|
func (p *Parser) ParseLogMessage(msg []byte) error {
|
||||||
|
s := bytesutil.ToUnsafeString(msg)
|
||||||
|
v, err := p.p.Parse(s)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot parse json: %w", err)
|
||||||
|
}
|
||||||
|
if t := v.Type(); t != fastjson.TypeObject {
|
||||||
|
return fmt.Errorf("expecting json dictionary; got %s", t)
|
||||||
|
}
|
||||||
|
p.reset()
|
||||||
|
p.Fields, p.buf, p.prefixBuf = appendLogFields(p.Fields, p.buf, p.prefixBuf, v)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func appendLogFields(dst []logstorage.Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]logstorage.Field, []byte, []byte) {
|
||||||
|
o := v.GetObject()
|
||||||
|
o.Visit(func(k []byte, v *fastjson.Value) {
|
||||||
|
t := v.Type()
|
||||||
|
switch t {
|
||||||
|
case fastjson.TypeNull:
|
||||||
|
// Skip nulls
|
||||||
|
case fastjson.TypeObject:
|
||||||
|
// Flatten nested JSON objects.
|
||||||
|
// For example, {"foo":{"bar":"baz"}} is converted to {"foo.bar":"baz"}
|
||||||
|
prefixLen := len(prefixBuf)
|
||||||
|
prefixBuf = append(prefixBuf, k...)
|
||||||
|
prefixBuf = append(prefixBuf, '.')
|
||||||
|
dst, dstBuf, prefixBuf = appendLogFields(dst, dstBuf, prefixBuf, v)
|
||||||
|
prefixBuf = prefixBuf[:prefixLen]
|
||||||
|
case fastjson.TypeArray, fastjson.TypeNumber, fastjson.TypeTrue, fastjson.TypeFalse:
|
||||||
|
// Convert JSON arrays, numbers, true and false values to their string representation
|
||||||
|
dstBufLen := len(dstBuf)
|
||||||
|
dstBuf = v.MarshalTo(dstBuf)
|
||||||
|
value := dstBuf[dstBufLen:]
|
||||||
|
dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value)
|
||||||
|
case fastjson.TypeString:
|
||||||
|
// Decode JSON strings
|
||||||
|
dstBufLen := len(dstBuf)
|
||||||
|
dstBuf = append(dstBuf, v.GetStringBytes()...)
|
||||||
|
value := dstBuf[dstBufLen:]
|
||||||
|
dst, dstBuf = appendLogField(dst, dstBuf, prefixBuf, k, value)
|
||||||
|
default:
|
||||||
|
logger.Panicf("BUG: unexpected JSON type: %s", t)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return dst, dstBuf, prefixBuf
|
||||||
|
}
|
||||||
|
|
||||||
|
func appendLogField(dst []logstorage.Field, dstBuf, prefixBuf, k, value []byte) ([]logstorage.Field, []byte) {
|
||||||
|
dstBufLen := len(dstBuf)
|
||||||
|
dstBuf = append(dstBuf, prefixBuf...)
|
||||||
|
dstBuf = append(dstBuf, k...)
|
||||||
|
name := dstBuf[dstBufLen:]
|
||||||
|
|
||||||
|
dst = append(dst, logstorage.Field{
|
||||||
|
Name: bytesutil.ToUnsafeString(name),
|
||||||
|
Value: bytesutil.ToUnsafeString(value),
|
||||||
|
})
|
||||||
|
return dst, dstBuf
|
||||||
|
}
|
71
lib/logjson/parser_test.go
Normal file
71
lib/logjson/parser_test.go
Normal file
|
@ -0,0 +1,71 @@
|
||||||
|
package logjson
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParserFailure(t *testing.T) {
|
||||||
|
f := func(data string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
p := GetParser()
|
||||||
|
err := p.ParseLogMessage([]byte(data))
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expecting non-nil error")
|
||||||
|
}
|
||||||
|
PutParser(p)
|
||||||
|
}
|
||||||
|
f("")
|
||||||
|
f("{foo")
|
||||||
|
f("[1,2,3]")
|
||||||
|
f(`{"foo",}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParserSuccess(t *testing.T) {
|
||||||
|
f := func(data string, fieldsExpected []logstorage.Field) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
p := GetParser()
|
||||||
|
err := p.ParseLogMessage([]byte(data))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(p.Fields, fieldsExpected) {
|
||||||
|
t.Fatalf("unexpected fields;\ngot\n%s\nwant\n%s", p.Fields, fieldsExpected)
|
||||||
|
}
|
||||||
|
PutParser(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
f("{}", nil)
|
||||||
|
f(`{"foo":"bar"}`, []logstorage.Field{
|
||||||
|
{
|
||||||
|
Name: "foo",
|
||||||
|
Value: "bar",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
f(`{"foo":{"bar":"baz"},"a":1,"b":true,"c":[1,2],"d":false}`, []logstorage.Field{
|
||||||
|
{
|
||||||
|
Name: "foo.bar",
|
||||||
|
Value: "baz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "a",
|
||||||
|
Value: "1",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "b",
|
||||||
|
Value: "true",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "c",
|
||||||
|
Value: "[1,2]",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "d",
|
||||||
|
Value: "false",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in a new issue