mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vmselect: improve performance scalability on multi-CPU systems for /api/v1/export/...
endpoints
This commit is contained in:
parent
49311ae977
commit
c53b7e66ef
5 changed files with 208 additions and 244 deletions
|
@ -846,7 +846,8 @@ var ssPool sync.Pool
|
|||
// Data processing is immediately stopped if f returns non-nil error.
|
||||
// It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block.
|
||||
// It is the responsibility of f to filter blocks according to the given tr.
|
||||
func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline, f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error) error {
|
||||
func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline,
|
||||
f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error) error {
|
||||
qt = qt.NewChild("export blocks: %s", sq)
|
||||
defer qt.Done()
|
||||
if deadline.Exceeded() {
|
||||
|
@ -881,10 +882,10 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
|
|||
var wg sync.WaitGroup
|
||||
wg.Add(gomaxprocs)
|
||||
for i := 0; i < gomaxprocs; i++ {
|
||||
go func() {
|
||||
go func(workerID uint) {
|
||||
defer wg.Done()
|
||||
for xw := range workCh {
|
||||
if err := f(&xw.mn, &xw.b, tr); err != nil {
|
||||
if err := f(&xw.mn, &xw.b, tr, workerID); err != nil {
|
||||
errGlobalLock.Lock()
|
||||
if errGlobal != nil {
|
||||
errGlobal = err
|
||||
|
@ -895,7 +896,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
|
|||
xw.reset()
|
||||
exportWorkPool.Put(xw)
|
||||
}
|
||||
}()
|
||||
}(uint(i))
|
||||
}
|
||||
|
||||
// Feed workers with work
|
||||
|
|
|
@ -126,49 +126,24 @@
|
|||
}
|
||||
{% endfunc %}
|
||||
|
||||
{% func ExportPromAPIResponse(resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) %}
|
||||
{% func ExportPromAPIHeader() %}
|
||||
{
|
||||
{% code
|
||||
lines := 0
|
||||
bytesTotal := 0
|
||||
%}
|
||||
"status":"success",
|
||||
"data":{
|
||||
"resultType":"matrix",
|
||||
"result":[
|
||||
{% code bb, ok := <-resultsCh %}
|
||||
{% if ok %}
|
||||
{%z= bb.B %}
|
||||
{% code
|
||||
lines++
|
||||
bytesTotal += len(bb.B)
|
||||
quicktemplate.ReleaseByteBuffer(bb)
|
||||
%}
|
||||
{% for bb := range resultsCh %}
|
||||
,{%z= bb.B %}
|
||||
{% code
|
||||
lines++
|
||||
bytesTotal += len(bb.B)
|
||||
quicktemplate.ReleaseByteBuffer(bb)
|
||||
%}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
{% endfunc %}
|
||||
|
||||
{% func ExportPromAPIFooter(qt *querytracer.Tracer) %}
|
||||
]
|
||||
}
|
||||
{% code
|
||||
qt.Donef("export format=promapi: lines=%d, bytes=%d", lines, bytesTotal)
|
||||
qt.Donef("export format=promapi")
|
||||
%}
|
||||
{%= dumpQueryTrace(qt) %}
|
||||
}
|
||||
{% endfunc %}
|
||||
|
||||
{% func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) %}
|
||||
{% for bb := range resultsCh %}
|
||||
{%z= bb.B %}
|
||||
{% code quicktemplate.ReleaseByteBuffer(bb) %}
|
||||
{% endfor %}
|
||||
{% endfunc %}
|
||||
|
||||
{% func prometheusMetricName(mn *storage.MetricName) %}
|
||||
{%z= mn.MetricGroup %}
|
||||
{% if len(mn.Tags) > 0 %}
|
||||
|
|
|
@ -415,184 +415,142 @@ func ExportPromAPILine(xb *exportBlock) string {
|
|||
}
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:129
|
||||
func StreamExportPromAPIResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) {
|
||||
func StreamExportPromAPIHeader(qw422016 *qt422016.Writer) {
|
||||
//line app/vmselect/prometheus/export.qtpl:129
|
||||
qw422016.N().S(`{`)
|
||||
//line app/vmselect/prometheus/export.qtpl:132
|
||||
lines := 0
|
||||
bytesTotal := 0
|
||||
qw422016.N().S(`{"status":"success","data":{"resultType":"matrix","result":[`)
|
||||
//line app/vmselect/prometheus/export.qtpl:135
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:134
|
||||
qw422016.N().S(`"status":"success","data":{"resultType":"matrix","result":[`)
|
||||
//line app/vmselect/prometheus/export.qtpl:139
|
||||
bb, ok := <-resultsCh
|
||||
//line app/vmselect/prometheus/export.qtpl:135
|
||||
func WriteExportPromAPIHeader(qq422016 qtio422016.Writer) {
|
||||
//line app/vmselect/prometheus/export.qtpl:135
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vmselect/prometheus/export.qtpl:135
|
||||
StreamExportPromAPIHeader(qw422016)
|
||||
//line app/vmselect/prometheus/export.qtpl:135
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vmselect/prometheus/export.qtpl:135
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:140
|
||||
if ok {
|
||||
//line app/vmselect/prometheus/export.qtpl:141
|
||||
qw422016.N().Z(bb.B)
|
||||
//line app/vmselect/prometheus/export.qtpl:143
|
||||
lines++
|
||||
bytesTotal += len(bb.B)
|
||||
quicktemplate.ReleaseByteBuffer(bb)
|
||||
//line app/vmselect/prometheus/export.qtpl:135
|
||||
func ExportPromAPIHeader() string {
|
||||
//line app/vmselect/prometheus/export.qtpl:135
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vmselect/prometheus/export.qtpl:135
|
||||
WriteExportPromAPIHeader(qb422016)
|
||||
//line app/vmselect/prometheus/export.qtpl:135
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vmselect/prometheus/export.qtpl:135
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vmselect/prometheus/export.qtpl:135
|
||||
return qs422016
|
||||
//line app/vmselect/prometheus/export.qtpl:135
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:147
|
||||
for bb := range resultsCh {
|
||||
//line app/vmselect/prometheus/export.qtpl:147
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vmselect/prometheus/export.qtpl:148
|
||||
qw422016.N().Z(bb.B)
|
||||
//line app/vmselect/prometheus/export.qtpl:150
|
||||
lines++
|
||||
bytesTotal += len(bb.B)
|
||||
quicktemplate.ReleaseByteBuffer(bb)
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:154
|
||||
}
|
||||
//line app/vmselect/prometheus/export.qtpl:155
|
||||
}
|
||||
//line app/vmselect/prometheus/export.qtpl:155
|
||||
//line app/vmselect/prometheus/export.qtpl:137
|
||||
func StreamExportPromAPIFooter(qw422016 *qt422016.Writer, qt *querytracer.Tracer) {
|
||||
//line app/vmselect/prometheus/export.qtpl:137
|
||||
qw422016.N().S(`]}`)
|
||||
//line app/vmselect/prometheus/export.qtpl:159
|
||||
qt.Donef("export format=promapi: lines=%d, bytes=%d", lines, bytesTotal)
|
||||
//line app/vmselect/prometheus/export.qtpl:141
|
||||
qt.Donef("export format=promapi")
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:161
|
||||
//line app/vmselect/prometheus/export.qtpl:143
|
||||
streamdumpQueryTrace(qw422016, qt)
|
||||
//line app/vmselect/prometheus/export.qtpl:161
|
||||
//line app/vmselect/prometheus/export.qtpl:143
|
||||
qw422016.N().S(`}`)
|
||||
//line app/vmselect/prometheus/export.qtpl:163
|
||||
//line app/vmselect/prometheus/export.qtpl:145
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:163
|
||||
func WriteExportPromAPIResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) {
|
||||
//line app/vmselect/prometheus/export.qtpl:163
|
||||
//line app/vmselect/prometheus/export.qtpl:145
|
||||
func WriteExportPromAPIFooter(qq422016 qtio422016.Writer, qt *querytracer.Tracer) {
|
||||
//line app/vmselect/prometheus/export.qtpl:145
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vmselect/prometheus/export.qtpl:163
|
||||
StreamExportPromAPIResponse(qw422016, resultsCh, qt)
|
||||
//line app/vmselect/prometheus/export.qtpl:163
|
||||
//line app/vmselect/prometheus/export.qtpl:145
|
||||
StreamExportPromAPIFooter(qw422016, qt)
|
||||
//line app/vmselect/prometheus/export.qtpl:145
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vmselect/prometheus/export.qtpl:163
|
||||
//line app/vmselect/prometheus/export.qtpl:145
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:163
|
||||
func ExportPromAPIResponse(resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) string {
|
||||
//line app/vmselect/prometheus/export.qtpl:163
|
||||
//line app/vmselect/prometheus/export.qtpl:145
|
||||
func ExportPromAPIFooter(qt *querytracer.Tracer) string {
|
||||
//line app/vmselect/prometheus/export.qtpl:145
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vmselect/prometheus/export.qtpl:163
|
||||
WriteExportPromAPIResponse(qb422016, resultsCh, qt)
|
||||
//line app/vmselect/prometheus/export.qtpl:163
|
||||
//line app/vmselect/prometheus/export.qtpl:145
|
||||
WriteExportPromAPIFooter(qb422016, qt)
|
||||
//line app/vmselect/prometheus/export.qtpl:145
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vmselect/prometheus/export.qtpl:163
|
||||
//line app/vmselect/prometheus/export.qtpl:145
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vmselect/prometheus/export.qtpl:163
|
||||
//line app/vmselect/prometheus/export.qtpl:145
|
||||
return qs422016
|
||||
//line app/vmselect/prometheus/export.qtpl:163
|
||||
//line app/vmselect/prometheus/export.qtpl:145
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:165
|
||||
func StreamExportStdResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) {
|
||||
//line app/vmselect/prometheus/export.qtpl:166
|
||||
for bb := range resultsCh {
|
||||
//line app/vmselect/prometheus/export.qtpl:167
|
||||
qw422016.N().Z(bb.B)
|
||||
//line app/vmselect/prometheus/export.qtpl:168
|
||||
quicktemplate.ReleaseByteBuffer(bb)
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:169
|
||||
}
|
||||
//line app/vmselect/prometheus/export.qtpl:170
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:170
|
||||
func WriteExportStdResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) {
|
||||
//line app/vmselect/prometheus/export.qtpl:170
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vmselect/prometheus/export.qtpl:170
|
||||
StreamExportStdResponse(qw422016, resultsCh, qt)
|
||||
//line app/vmselect/prometheus/export.qtpl:170
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vmselect/prometheus/export.qtpl:170
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:170
|
||||
func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) string {
|
||||
//line app/vmselect/prometheus/export.qtpl:170
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vmselect/prometheus/export.qtpl:170
|
||||
WriteExportStdResponse(qb422016, resultsCh, qt)
|
||||
//line app/vmselect/prometheus/export.qtpl:170
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vmselect/prometheus/export.qtpl:170
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vmselect/prometheus/export.qtpl:170
|
||||
return qs422016
|
||||
//line app/vmselect/prometheus/export.qtpl:170
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:172
|
||||
//line app/vmselect/prometheus/export.qtpl:147
|
||||
func streamprometheusMetricName(qw422016 *qt422016.Writer, mn *storage.MetricName) {
|
||||
//line app/vmselect/prometheus/export.qtpl:173
|
||||
//line app/vmselect/prometheus/export.qtpl:148
|
||||
qw422016.N().Z(mn.MetricGroup)
|
||||
//line app/vmselect/prometheus/export.qtpl:174
|
||||
//line app/vmselect/prometheus/export.qtpl:149
|
||||
if len(mn.Tags) > 0 {
|
||||
//line app/vmselect/prometheus/export.qtpl:174
|
||||
//line app/vmselect/prometheus/export.qtpl:149
|
||||
qw422016.N().S(`{`)
|
||||
//line app/vmselect/prometheus/export.qtpl:176
|
||||
//line app/vmselect/prometheus/export.qtpl:151
|
||||
tags := mn.Tags
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:177
|
||||
//line app/vmselect/prometheus/export.qtpl:152
|
||||
qw422016.N().Z(tags[0].Key)
|
||||
//line app/vmselect/prometheus/export.qtpl:177
|
||||
//line app/vmselect/prometheus/export.qtpl:152
|
||||
qw422016.N().S(`=`)
|
||||
//line app/vmselect/prometheus/export.qtpl:177
|
||||
//line app/vmselect/prometheus/export.qtpl:152
|
||||
qw422016.N().QZ(tags[0].Value)
|
||||
//line app/vmselect/prometheus/export.qtpl:178
|
||||
//line app/vmselect/prometheus/export.qtpl:153
|
||||
tags = tags[1:]
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:179
|
||||
//line app/vmselect/prometheus/export.qtpl:154
|
||||
for i := range tags {
|
||||
//line app/vmselect/prometheus/export.qtpl:180
|
||||
//line app/vmselect/prometheus/export.qtpl:155
|
||||
tag := &tags[i]
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:180
|
||||
//line app/vmselect/prometheus/export.qtpl:155
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vmselect/prometheus/export.qtpl:181
|
||||
//line app/vmselect/prometheus/export.qtpl:156
|
||||
qw422016.N().Z(tag.Key)
|
||||
//line app/vmselect/prometheus/export.qtpl:181
|
||||
//line app/vmselect/prometheus/export.qtpl:156
|
||||
qw422016.N().S(`=`)
|
||||
//line app/vmselect/prometheus/export.qtpl:181
|
||||
//line app/vmselect/prometheus/export.qtpl:156
|
||||
qw422016.N().QZ(tag.Value)
|
||||
//line app/vmselect/prometheus/export.qtpl:182
|
||||
//line app/vmselect/prometheus/export.qtpl:157
|
||||
}
|
||||
//line app/vmselect/prometheus/export.qtpl:182
|
||||
//line app/vmselect/prometheus/export.qtpl:157
|
||||
qw422016.N().S(`}`)
|
||||
//line app/vmselect/prometheus/export.qtpl:184
|
||||
//line app/vmselect/prometheus/export.qtpl:159
|
||||
}
|
||||
//line app/vmselect/prometheus/export.qtpl:185
|
||||
//line app/vmselect/prometheus/export.qtpl:160
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:185
|
||||
//line app/vmselect/prometheus/export.qtpl:160
|
||||
func writeprometheusMetricName(qq422016 qtio422016.Writer, mn *storage.MetricName) {
|
||||
//line app/vmselect/prometheus/export.qtpl:185
|
||||
//line app/vmselect/prometheus/export.qtpl:160
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vmselect/prometheus/export.qtpl:185
|
||||
//line app/vmselect/prometheus/export.qtpl:160
|
||||
streamprometheusMetricName(qw422016, mn)
|
||||
//line app/vmselect/prometheus/export.qtpl:185
|
||||
//line app/vmselect/prometheus/export.qtpl:160
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vmselect/prometheus/export.qtpl:185
|
||||
//line app/vmselect/prometheus/export.qtpl:160
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:185
|
||||
//line app/vmselect/prometheus/export.qtpl:160
|
||||
func prometheusMetricName(mn *storage.MetricName) string {
|
||||
//line app/vmselect/prometheus/export.qtpl:185
|
||||
//line app/vmselect/prometheus/export.qtpl:160
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vmselect/prometheus/export.qtpl:185
|
||||
//line app/vmselect/prometheus/export.qtpl:160
|
||||
writeprometheusMetricName(qb422016, mn)
|
||||
//line app/vmselect/prometheus/export.qtpl:185
|
||||
//line app/vmselect/prometheus/export.qtpl:160
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vmselect/prometheus/export.qtpl:185
|
||||
//line app/vmselect/prometheus/export.qtpl:160
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vmselect/prometheus/export.qtpl:185
|
||||
//line app/vmselect/prometheus/export.qtpl:160
|
||||
return qs422016
|
||||
//line app/vmselect/prometheus/export.qtpl:185
|
||||
//line app/vmselect/prometheus/export.qtpl:160
|
||||
}
|
||||
|
|
|
@ -5,9 +5,11 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter"
|
||||
|
@ -16,7 +18,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
|
@ -25,7 +26,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/valyala/fastjson/fastfloat"
|
||||
"github.com/valyala/quicktemplate"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -84,37 +84,19 @@ func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request
|
|||
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
||||
bw := bufferedwriter.Get(w)
|
||||
defer bufferedwriter.Put(bw)
|
||||
var m sync.Map
|
||||
sw := newScalableWriter(bw)
|
||||
err = rss.RunParallel(nil, func(rs *netstorage.Result, workerID uint) error {
|
||||
if err := bw.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
v, ok := m.Load(workerID)
|
||||
if !ok {
|
||||
v = &bytesutil.ByteBuffer{}
|
||||
m.Store(workerID, v)
|
||||
}
|
||||
bb := v.(*bytesutil.ByteBuffer)
|
||||
bb := sw.getBuffer(workerID)
|
||||
WriteFederate(bb, rs)
|
||||
if len(bb.B) < 1024*1024 {
|
||||
return nil
|
||||
}
|
||||
_, err := bw.Write(bb.B)
|
||||
bb.Reset()
|
||||
return err
|
||||
return sw.maybeFlushBuffer(bb)
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error during sending data to remote client: %w", err)
|
||||
}
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
bb := v.(*bytesutil.ByteBuffer)
|
||||
_, err := bw.Write(bb.B)
|
||||
return err == nil
|
||||
})
|
||||
if err := bw.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return sw.flush()
|
||||
}
|
||||
|
||||
var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`)
|
||||
|
@ -139,15 +121,14 @@ func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques
|
|||
w.Header().Set("Content-Type", "text/csv; charset=utf-8")
|
||||
bw := bufferedwriter.Get(w)
|
||||
defer bufferedwriter.Put(bw)
|
||||
|
||||
resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs())
|
||||
writeCSVLine := func(xb *exportBlock) {
|
||||
sw := newScalableWriter(bw)
|
||||
writeCSVLine := func(xb *exportBlock, workerID uint) error {
|
||||
if len(xb.timestamps) == 0 {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
bb := quicktemplate.AcquireByteBuffer()
|
||||
bb := sw.getBuffer(workerID)
|
||||
WriteExportCSVLine(bb, xb, fieldNames)
|
||||
resultsCh <- bb
|
||||
return sw.maybeFlushBuffer(bb)
|
||||
}
|
||||
doneCh := make(chan error, 1)
|
||||
if !reduceMemUsage {
|
||||
|
@ -164,17 +145,18 @@ func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques
|
|||
xb.mn = &rs.MetricName
|
||||
xb.timestamps = rs.Timestamps
|
||||
xb.values = rs.Values
|
||||
writeCSVLine(xb)
|
||||
if err := writeCSVLine(xb, workerID); err != nil {
|
||||
return err
|
||||
}
|
||||
xb.reset()
|
||||
exportBlockPool.Put(xb)
|
||||
return nil
|
||||
})
|
||||
close(resultsCh)
|
||||
doneCh <- err
|
||||
}()
|
||||
} else {
|
||||
go func() {
|
||||
err := netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
|
||||
err := netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error {
|
||||
if err := bw.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -184,29 +166,21 @@ func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques
|
|||
xb := exportBlockPool.Get().(*exportBlock)
|
||||
xb.mn = mn
|
||||
xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr)
|
||||
writeCSVLine(xb)
|
||||
if err := writeCSVLine(xb, workerID); err != nil {
|
||||
return err
|
||||
}
|
||||
xb.reset()
|
||||
exportBlockPool.Put(xb)
|
||||
return nil
|
||||
})
|
||||
close(resultsCh)
|
||||
doneCh <- err
|
||||
}()
|
||||
}
|
||||
// Consume all the data from resultsCh.
|
||||
for bb := range resultsCh {
|
||||
// Do not check for error in bw.Write, since this error is checked inside netstorage.ExportBlocks above.
|
||||
_, _ = bw.Write(bb.B)
|
||||
quicktemplate.ReleaseByteBuffer(bb)
|
||||
}
|
||||
if err := bw.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
err = <-doneCh
|
||||
if err != nil {
|
||||
return fmt.Errorf("error during sending the exported csv data to remote client: %w", err)
|
||||
}
|
||||
return nil
|
||||
return sw.flush()
|
||||
}
|
||||
|
||||
var exportCSVDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export/csv"}`)
|
||||
|
@ -224,6 +198,7 @@ func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Req
|
|||
w.Header().Set("Content-Type", "VictoriaMetrics/native")
|
||||
bw := bufferedwriter.Get(w)
|
||||
defer bufferedwriter.Put(bw)
|
||||
sw := newScalableWriter(bw)
|
||||
|
||||
// Marshal tr
|
||||
trBuf := make([]byte, 0, 16)
|
||||
|
@ -232,13 +207,13 @@ func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Req
|
|||
_, _ = bw.Write(trBuf)
|
||||
|
||||
// Marshal native blocks.
|
||||
err = netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
|
||||
err = netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error {
|
||||
if err := bw.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
dstBuf := bbPool.Get()
|
||||
bb := sw.getBuffer(workerID)
|
||||
dst := bb.B
|
||||
tmpBuf := bbPool.Get()
|
||||
dst := dstBuf.B
|
||||
tmp := tmpBuf.B
|
||||
|
||||
// Marshal mn
|
||||
|
@ -254,19 +229,13 @@ func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Req
|
|||
tmpBuf.B = tmp
|
||||
bbPool.Put(tmpBuf)
|
||||
|
||||
_, err := bw.Write(dst)
|
||||
|
||||
dstBuf.B = dst
|
||||
bbPool.Put(dstBuf)
|
||||
return err
|
||||
bb.B = dst
|
||||
return sw.maybeFlushBuffer(bb)
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error during sending native data to remote client: %w", err)
|
||||
}
|
||||
if err := bw.Flush(); err != nil {
|
||||
return fmt.Errorf("error during flushing native data to remote client: %w", err)
|
||||
}
|
||||
return nil
|
||||
return sw.flush()
|
||||
}
|
||||
|
||||
var exportNativeDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export/native"}`)
|
||||
|
@ -293,31 +262,48 @@ func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request)
|
|||
var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`)
|
||||
|
||||
func exportHandler(qt *querytracer.Tracer, w http.ResponseWriter, cp *commonParams, format string, maxRowsPerLine int, reduceMemUsage bool) error {
|
||||
writeResponseFunc := WriteExportStdResponse
|
||||
writeLineFunc := func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) {
|
||||
bb := quicktemplate.AcquireByteBuffer()
|
||||
bw := bufferedwriter.Get(w)
|
||||
defer bufferedwriter.Put(bw)
|
||||
sw := newScalableWriter(bw)
|
||||
writeLineFunc := func(xb *exportBlock, workerID uint) error {
|
||||
bb := sw.getBuffer(workerID)
|
||||
WriteExportJSONLine(bb, xb)
|
||||
resultsCh <- bb
|
||||
return sw.maybeFlushBuffer(bb)
|
||||
}
|
||||
contentType := "application/stream+json; charset=utf-8"
|
||||
if format == "prometheus" {
|
||||
contentType = "text/plain; charset=utf-8"
|
||||
writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) {
|
||||
bb := quicktemplate.AcquireByteBuffer()
|
||||
writeLineFunc = func(xb *exportBlock, workerID uint) error {
|
||||
bb := sw.getBuffer(workerID)
|
||||
WriteExportPrometheusLine(bb, xb)
|
||||
resultsCh <- bb
|
||||
return sw.maybeFlushBuffer(bb)
|
||||
}
|
||||
} else if format == "promapi" {
|
||||
writeResponseFunc = WriteExportPromAPIResponse
|
||||
writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) {
|
||||
bb := quicktemplate.AcquireByteBuffer()
|
||||
WriteExportPromAPIHeader(bw)
|
||||
firstLineOnce := uint32(0)
|
||||
firstLineSent := uint32(0)
|
||||
writeLineFunc = func(xb *exportBlock, workerID uint) error {
|
||||
bb := sw.getBuffer(workerID)
|
||||
if atomic.CompareAndSwapUint32(&firstLineOnce, 0, 1) {
|
||||
// Send the first line to sw.bw
|
||||
WriteExportPromAPILine(bb, xb)
|
||||
_, err := sw.bw.Write(bb.B)
|
||||
bb.Reset()
|
||||
atomic.StoreUint32(&firstLineSent, 1)
|
||||
return err
|
||||
}
|
||||
for atomic.LoadUint32(&firstLineSent) == 0 {
|
||||
// Busy wait until the first line is sent to sw.bw
|
||||
runtime.Gosched()
|
||||
}
|
||||
bb.B = append(bb.B, ',')
|
||||
WriteExportPromAPILine(bb, xb)
|
||||
resultsCh <- bb
|
||||
return sw.maybeFlushBuffer(bb)
|
||||
}
|
||||
}
|
||||
if maxRowsPerLine > 0 {
|
||||
writeLineFuncOrig := writeLineFunc
|
||||
writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) {
|
||||
writeLineFunc = func(xb *exportBlock, workerID uint) error {
|
||||
valuesOrig := xb.values
|
||||
timestampsOrig := xb.timestamps
|
||||
values := valuesOrig
|
||||
|
@ -338,19 +324,19 @@ func exportHandler(qt *querytracer.Tracer, w http.ResponseWriter, cp *commonPara
|
|||
}
|
||||
xb.values = valuesChunk
|
||||
xb.timestamps = timestampsChunk
|
||||
writeLineFuncOrig(xb, resultsCh)
|
||||
if err := writeLineFuncOrig(xb, workerID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
xb.values = valuesOrig
|
||||
xb.timestamps = timestampsOrig
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
sq := storage.NewSearchQuery(cp.start, cp.end, cp.filterss, *maxExportSeries)
|
||||
w.Header().Set("Content-Type", contentType)
|
||||
bw := bufferedwriter.Get(w)
|
||||
defer bufferedwriter.Put(bw)
|
||||
|
||||
resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs())
|
||||
doneCh := make(chan error, 1)
|
||||
if !reduceMemUsage {
|
||||
rss, err := netstorage.ProcessSearchQuery(qt, sq, cp.deadline)
|
||||
|
@ -367,19 +353,20 @@ func exportHandler(qt *querytracer.Tracer, w http.ResponseWriter, cp *commonPara
|
|||
xb.mn = &rs.MetricName
|
||||
xb.timestamps = rs.Timestamps
|
||||
xb.values = rs.Values
|
||||
writeLineFunc(xb, resultsCh)
|
||||
if err := writeLineFunc(xb, workerID); err != nil {
|
||||
return err
|
||||
}
|
||||
xb.reset()
|
||||
exportBlockPool.Put(xb)
|
||||
return nil
|
||||
})
|
||||
qtChild.Done()
|
||||
close(resultsCh)
|
||||
doneCh <- err
|
||||
}()
|
||||
} else {
|
||||
qtChild := qt.NewChild("background export format=%s", format)
|
||||
go func() {
|
||||
err := netstorage.ExportBlocks(qtChild, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
|
||||
err := netstorage.ExportBlocks(qtChild, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error {
|
||||
if err := bw.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -390,26 +377,30 @@ func exportHandler(qt *querytracer.Tracer, w http.ResponseWriter, cp *commonPara
|
|||
xb.mn = mn
|
||||
xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr)
|
||||
if len(xb.timestamps) > 0 {
|
||||
writeLineFunc(xb, resultsCh)
|
||||
if err := writeLineFunc(xb, workerID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
xb.reset()
|
||||
exportBlockPool.Put(xb)
|
||||
return nil
|
||||
})
|
||||
qtChild.Done()
|
||||
close(resultsCh)
|
||||
doneCh <- err
|
||||
}()
|
||||
}
|
||||
|
||||
// writeResponseFunc must consume all the data from resultsCh.
|
||||
writeResponseFunc(bw, resultsCh, qt)
|
||||
if err := bw.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
err := <-doneCh
|
||||
if err != nil {
|
||||
return fmt.Errorf("error during sending the data to remote client: %w", err)
|
||||
return fmt.Errorf("cannot send data to remote client: %w", err)
|
||||
}
|
||||
if err := sw.flush(); err != nil {
|
||||
return fmt.Errorf("cannot send data to remote client: %w", err)
|
||||
}
|
||||
if format == "promapi" {
|
||||
WriteExportPromAPIFooter(bw, qt)
|
||||
}
|
||||
if err := bw.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -1137,3 +1128,41 @@ func getCommonParams(r *http.Request, startTime time.Time, requireNonEmptyMatch
|
|||
}
|
||||
return cp, nil
|
||||
}
|
||||
|
||||
type scalableWriter struct {
|
||||
bw *bufferedwriter.Writer
|
||||
m sync.Map
|
||||
}
|
||||
|
||||
func newScalableWriter(bw *bufferedwriter.Writer) *scalableWriter {
|
||||
return &scalableWriter{
|
||||
bw: bw,
|
||||
}
|
||||
}
|
||||
|
||||
func (sw *scalableWriter) getBuffer(workerID uint) *bytesutil.ByteBuffer {
|
||||
v, ok := sw.m.Load(workerID)
|
||||
if !ok {
|
||||
v = &bytesutil.ByteBuffer{}
|
||||
sw.m.Store(workerID, v)
|
||||
}
|
||||
return v.(*bytesutil.ByteBuffer)
|
||||
}
|
||||
|
||||
func (sw *scalableWriter) maybeFlushBuffer(bb *bytesutil.ByteBuffer) error {
|
||||
if len(bb.B) < 1024*1024 {
|
||||
return nil
|
||||
}
|
||||
_, err := sw.bw.Write(bb.B)
|
||||
bb.Reset()
|
||||
return err
|
||||
}
|
||||
|
||||
func (sw *scalableWriter) flush() error {
|
||||
sw.m.Range(func(k, v interface{}) bool {
|
||||
bb := v.(*bytesutil.ByteBuffer)
|
||||
_, err := sw.bw.Write(bb.B)
|
||||
return err == nil
|
||||
})
|
||||
return sw.bw.Flush()
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#m
|
|||
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): support specifying tenant ids via `vm_account_id` and `vm_project_id` labels. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2970).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling) performance by up to 3x for non-trivial `regex` values such as `([^:]+):.+`, which can be used for extracting a `host` part from `host:port` label value.
|
||||
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): improve performance by up to 4x for queries containing non-trivial `regex` filters such as `{path=~"/foo/.+|/bar"}`.
|
||||
* FEATURE: improve performance scalability on systems with many CPU cores for [/federate](https://docs.victoriametrics.com/#federation) and [/api/v1/export/...](https://docs.victoriametrics.com/#how-to-export-time-series) endpoints.
|
||||
* FEATURE: sanitize metric names for data ingested via [DataDog protocol](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) according to [DataDog metric naming](https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics). The behaviour can be disabled by passing `-datadog.sanitizeMetricName=false` command-line flag. Thanks to @PerGon for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3105).
|
||||
* FEATURE: add `-usePromCompatibleNaming` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent.html), to single-node VictoriaMetrics and to `vminsert` component of VictoriaMetrics cluster. This flag can be used for normalizing the ingested metric names and label names to [Prometheus-compatible form](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). If this flag is set, then all the chars unsupported by Prometheus are replaced with `_` chars in metric names and labels of the ingested samples. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113).
|
||||
* FEATURE: accept whitespace in metric names and tags ingested via [Graphite plaintext protocol](https://docs.victoriametrics.com/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) according to [the specs](https://graphite.readthedocs.io/en/latest/tags.html). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3102).
|
||||
|
|
Loading…
Reference in a new issue