diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 2dead9f39..f8bf40754 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -846,7 +846,8 @@ var ssPool sync.Pool // Data processing is immediately stopped if f returns non-nil error. // It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block. // It is the responsibility of f to filter blocks according to the given tr. -func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline, f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error) error { +func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline, + f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error) error { qt = qt.NewChild("export blocks: %s", sq) defer qt.Done() if deadline.Exceeded() { @@ -881,10 +882,10 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear var wg sync.WaitGroup wg.Add(gomaxprocs) for i := 0; i < gomaxprocs; i++ { - go func() { + go func(workerID uint) { defer wg.Done() for xw := range workCh { - if err := f(&xw.mn, &xw.b, tr); err != nil { + if err := f(&xw.mn, &xw.b, tr, workerID); err != nil { errGlobalLock.Lock() if errGlobal != nil { errGlobal = err @@ -895,7 +896,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear xw.reset() exportWorkPool.Put(xw) } - }() + }(uint(i)) } // Feed workers with work diff --git a/app/vmselect/prometheus/export.qtpl b/app/vmselect/prometheus/export.qtpl index 90bbbd5f5..06f675e54 100644 --- a/app/vmselect/prometheus/export.qtpl +++ b/app/vmselect/prometheus/export.qtpl @@ -126,49 +126,24 @@ } {% endfunc %} -{% func ExportPromAPIResponse(resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) %} +{% func ExportPromAPIHeader() %} { - {% code - lines := 0 - bytesTotal := 0 - %} "status":"success", "data":{ "resultType":"matrix", "result":[ - {% code bb, ok := <-resultsCh %} - {% if ok %} - {%z= bb.B %} - {% code - lines++ - bytesTotal += len(bb.B) - quicktemplate.ReleaseByteBuffer(bb) - %} - {% for bb := range resultsCh %} - ,{%z= bb.B %} - {% code - lines++ - bytesTotal += len(bb.B) - quicktemplate.ReleaseByteBuffer(bb) - %} - {% endfor %} - {% endif %} +{% endfunc %} + +{% func ExportPromAPIFooter(qt *querytracer.Tracer) %} ] } {% code - qt.Donef("export format=promapi: lines=%d, bytes=%d", lines, bytesTotal) + qt.Donef("export format=promapi") %} {%= dumpQueryTrace(qt) %} } {% endfunc %} -{% func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) %} - {% for bb := range resultsCh %} - {%z= bb.B %} - {% code quicktemplate.ReleaseByteBuffer(bb) %} - {% endfor %} -{% endfunc %} - {% func prometheusMetricName(mn *storage.MetricName) %} {%z= mn.MetricGroup %} {% if len(mn.Tags) > 0 %} diff --git a/app/vmselect/prometheus/export.qtpl.go b/app/vmselect/prometheus/export.qtpl.go index f3913dbaa..7b20c7fa0 100644 --- a/app/vmselect/prometheus/export.qtpl.go +++ b/app/vmselect/prometheus/export.qtpl.go @@ -415,184 +415,142 @@ func ExportPromAPILine(xb *exportBlock) string { } //line app/vmselect/prometheus/export.qtpl:129 -func StreamExportPromAPIResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) { +func StreamExportPromAPIHeader(qw422016 *qt422016.Writer) { //line app/vmselect/prometheus/export.qtpl:129 - qw422016.N().S(`{`) -//line app/vmselect/prometheus/export.qtpl:132 - lines := 0 - bytesTotal := 0 + qw422016.N().S(`{"status":"success","data":{"resultType":"matrix","result":[`) +//line app/vmselect/prometheus/export.qtpl:135 +} -//line app/vmselect/prometheus/export.qtpl:134 - qw422016.N().S(`"status":"success","data":{"resultType":"matrix","result":[`) -//line app/vmselect/prometheus/export.qtpl:139 - bb, ok := <-resultsCh +//line app/vmselect/prometheus/export.qtpl:135 +func WriteExportPromAPIHeader(qq422016 qtio422016.Writer) { +//line app/vmselect/prometheus/export.qtpl:135 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vmselect/prometheus/export.qtpl:135 + StreamExportPromAPIHeader(qw422016) +//line app/vmselect/prometheus/export.qtpl:135 + qt422016.ReleaseWriter(qw422016) +//line app/vmselect/prometheus/export.qtpl:135 +} -//line app/vmselect/prometheus/export.qtpl:140 - if ok { -//line app/vmselect/prometheus/export.qtpl:141 - qw422016.N().Z(bb.B) -//line app/vmselect/prometheus/export.qtpl:143 - lines++ - bytesTotal += len(bb.B) - quicktemplate.ReleaseByteBuffer(bb) +//line app/vmselect/prometheus/export.qtpl:135 +func ExportPromAPIHeader() string { +//line app/vmselect/prometheus/export.qtpl:135 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vmselect/prometheus/export.qtpl:135 + WriteExportPromAPIHeader(qb422016) +//line app/vmselect/prometheus/export.qtpl:135 + qs422016 := string(qb422016.B) +//line app/vmselect/prometheus/export.qtpl:135 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vmselect/prometheus/export.qtpl:135 + return qs422016 +//line app/vmselect/prometheus/export.qtpl:135 +} -//line app/vmselect/prometheus/export.qtpl:147 - for bb := range resultsCh { -//line app/vmselect/prometheus/export.qtpl:147 - qw422016.N().S(`,`) -//line app/vmselect/prometheus/export.qtpl:148 - qw422016.N().Z(bb.B) -//line app/vmselect/prometheus/export.qtpl:150 - lines++ - bytesTotal += len(bb.B) - quicktemplate.ReleaseByteBuffer(bb) - -//line app/vmselect/prometheus/export.qtpl:154 - } -//line app/vmselect/prometheus/export.qtpl:155 - } -//line app/vmselect/prometheus/export.qtpl:155 +//line app/vmselect/prometheus/export.qtpl:137 +func StreamExportPromAPIFooter(qw422016 *qt422016.Writer, qt *querytracer.Tracer) { +//line app/vmselect/prometheus/export.qtpl:137 qw422016.N().S(`]}`) -//line app/vmselect/prometheus/export.qtpl:159 - qt.Donef("export format=promapi: lines=%d, bytes=%d", lines, bytesTotal) +//line app/vmselect/prometheus/export.qtpl:141 + qt.Donef("export format=promapi") -//line app/vmselect/prometheus/export.qtpl:161 +//line app/vmselect/prometheus/export.qtpl:143 streamdumpQueryTrace(qw422016, qt) -//line app/vmselect/prometheus/export.qtpl:161 +//line app/vmselect/prometheus/export.qtpl:143 qw422016.N().S(`}`) -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 } -//line app/vmselect/prometheus/export.qtpl:163 -func WriteExportPromAPIResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) { -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 +func WriteExportPromAPIFooter(qq422016 qtio422016.Writer, qt *querytracer.Tracer) { +//line app/vmselect/prometheus/export.qtpl:145 qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmselect/prometheus/export.qtpl:163 - StreamExportPromAPIResponse(qw422016, resultsCh, qt) -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 + StreamExportPromAPIFooter(qw422016, qt) +//line app/vmselect/prometheus/export.qtpl:145 qt422016.ReleaseWriter(qw422016) -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 } -//line app/vmselect/prometheus/export.qtpl:163 -func ExportPromAPIResponse(resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) string { -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 +func ExportPromAPIFooter(qt *querytracer.Tracer) string { +//line app/vmselect/prometheus/export.qtpl:145 qb422016 := qt422016.AcquireByteBuffer() -//line app/vmselect/prometheus/export.qtpl:163 - WriteExportPromAPIResponse(qb422016, resultsCh, qt) -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 + WriteExportPromAPIFooter(qb422016, qt) +//line app/vmselect/prometheus/export.qtpl:145 qs422016 := string(qb422016.B) -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 qt422016.ReleaseByteBuffer(qb422016) -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 return qs422016 -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 } -//line app/vmselect/prometheus/export.qtpl:165 -func StreamExportStdResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) { -//line app/vmselect/prometheus/export.qtpl:166 - for bb := range resultsCh { -//line app/vmselect/prometheus/export.qtpl:167 - qw422016.N().Z(bb.B) -//line app/vmselect/prometheus/export.qtpl:168 - quicktemplate.ReleaseByteBuffer(bb) - -//line app/vmselect/prometheus/export.qtpl:169 - } -//line app/vmselect/prometheus/export.qtpl:170 -} - -//line app/vmselect/prometheus/export.qtpl:170 -func WriteExportStdResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) { -//line app/vmselect/prometheus/export.qtpl:170 - qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmselect/prometheus/export.qtpl:170 - StreamExportStdResponse(qw422016, resultsCh, qt) -//line app/vmselect/prometheus/export.qtpl:170 - qt422016.ReleaseWriter(qw422016) -//line app/vmselect/prometheus/export.qtpl:170 -} - -//line app/vmselect/prometheus/export.qtpl:170 -func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) string { -//line app/vmselect/prometheus/export.qtpl:170 - qb422016 := qt422016.AcquireByteBuffer() -//line app/vmselect/prometheus/export.qtpl:170 - WriteExportStdResponse(qb422016, resultsCh, qt) -//line app/vmselect/prometheus/export.qtpl:170 - qs422016 := string(qb422016.B) -//line app/vmselect/prometheus/export.qtpl:170 - qt422016.ReleaseByteBuffer(qb422016) -//line app/vmselect/prometheus/export.qtpl:170 - return qs422016 -//line app/vmselect/prometheus/export.qtpl:170 -} - -//line app/vmselect/prometheus/export.qtpl:172 +//line app/vmselect/prometheus/export.qtpl:147 func streamprometheusMetricName(qw422016 *qt422016.Writer, mn *storage.MetricName) { -//line app/vmselect/prometheus/export.qtpl:173 +//line app/vmselect/prometheus/export.qtpl:148 qw422016.N().Z(mn.MetricGroup) -//line app/vmselect/prometheus/export.qtpl:174 +//line app/vmselect/prometheus/export.qtpl:149 if len(mn.Tags) > 0 { -//line app/vmselect/prometheus/export.qtpl:174 +//line app/vmselect/prometheus/export.qtpl:149 qw422016.N().S(`{`) -//line app/vmselect/prometheus/export.qtpl:176 +//line app/vmselect/prometheus/export.qtpl:151 tags := mn.Tags -//line app/vmselect/prometheus/export.qtpl:177 +//line app/vmselect/prometheus/export.qtpl:152 qw422016.N().Z(tags[0].Key) -//line app/vmselect/prometheus/export.qtpl:177 +//line app/vmselect/prometheus/export.qtpl:152 qw422016.N().S(`=`) -//line app/vmselect/prometheus/export.qtpl:177 +//line app/vmselect/prometheus/export.qtpl:152 qw422016.N().QZ(tags[0].Value) -//line app/vmselect/prometheus/export.qtpl:178 +//line app/vmselect/prometheus/export.qtpl:153 tags = tags[1:] -//line app/vmselect/prometheus/export.qtpl:179 +//line app/vmselect/prometheus/export.qtpl:154 for i := range tags { -//line app/vmselect/prometheus/export.qtpl:180 +//line app/vmselect/prometheus/export.qtpl:155 tag := &tags[i] -//line app/vmselect/prometheus/export.qtpl:180 +//line app/vmselect/prometheus/export.qtpl:155 qw422016.N().S(`,`) -//line app/vmselect/prometheus/export.qtpl:181 +//line app/vmselect/prometheus/export.qtpl:156 qw422016.N().Z(tag.Key) -//line app/vmselect/prometheus/export.qtpl:181 +//line app/vmselect/prometheus/export.qtpl:156 qw422016.N().S(`=`) -//line app/vmselect/prometheus/export.qtpl:181 +//line app/vmselect/prometheus/export.qtpl:156 qw422016.N().QZ(tag.Value) -//line app/vmselect/prometheus/export.qtpl:182 +//line app/vmselect/prometheus/export.qtpl:157 } -//line app/vmselect/prometheus/export.qtpl:182 +//line app/vmselect/prometheus/export.qtpl:157 qw422016.N().S(`}`) -//line app/vmselect/prometheus/export.qtpl:184 +//line app/vmselect/prometheus/export.qtpl:159 } -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 } -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 func writeprometheusMetricName(qq422016 qtio422016.Writer, mn *storage.MetricName) { -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 streamprometheusMetricName(qw422016, mn) -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 qt422016.ReleaseWriter(qw422016) -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 } -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 func prometheusMetricName(mn *storage.MetricName) string { -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 qb422016 := qt422016.AcquireByteBuffer() -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 writeprometheusMetricName(qb422016, mn) -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 qs422016 := string(qb422016.B) -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 qt422016.ReleaseByteBuffer(qb422016) -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 return qs422016 -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 } diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 33e49f54e..d451e956f 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -5,9 +5,11 @@ import ( "fmt" "math" "net/http" + "runtime" "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter" @@ -16,7 +18,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -25,7 +26,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" "github.com/valyala/fastjson/fastfloat" - "github.com/valyala/quicktemplate" ) var ( @@ -84,37 +84,19 @@ func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request w.Header().Set("Content-Type", "text/plain; charset=utf-8") bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) - var m sync.Map + sw := newScalableWriter(bw) err = rss.RunParallel(nil, func(rs *netstorage.Result, workerID uint) error { if err := bw.Error(); err != nil { return err } - v, ok := m.Load(workerID) - if !ok { - v = &bytesutil.ByteBuffer{} - m.Store(workerID, v) - } - bb := v.(*bytesutil.ByteBuffer) + bb := sw.getBuffer(workerID) WriteFederate(bb, rs) - if len(bb.B) < 1024*1024 { - return nil - } - _, err := bw.Write(bb.B) - bb.Reset() - return err + return sw.maybeFlushBuffer(bb) }) if err != nil { return fmt.Errorf("error during sending data to remote client: %w", err) } - m.Range(func(k, v interface{}) bool { - bb := v.(*bytesutil.ByteBuffer) - _, err := bw.Write(bb.B) - return err == nil - }) - if err := bw.Flush(); err != nil { - return err - } - return nil + return sw.flush() } var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`) @@ -139,15 +121,14 @@ func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques w.Header().Set("Content-Type", "text/csv; charset=utf-8") bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) - - resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs()) - writeCSVLine := func(xb *exportBlock) { + sw := newScalableWriter(bw) + writeCSVLine := func(xb *exportBlock, workerID uint) error { if len(xb.timestamps) == 0 { - return + return nil } - bb := quicktemplate.AcquireByteBuffer() + bb := sw.getBuffer(workerID) WriteExportCSVLine(bb, xb, fieldNames) - resultsCh <- bb + return sw.maybeFlushBuffer(bb) } doneCh := make(chan error, 1) if !reduceMemUsage { @@ -164,17 +145,18 @@ func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques xb.mn = &rs.MetricName xb.timestamps = rs.Timestamps xb.values = rs.Values - writeCSVLine(xb) + if err := writeCSVLine(xb, workerID); err != nil { + return err + } xb.reset() exportBlockPool.Put(xb) return nil }) - close(resultsCh) doneCh <- err }() } else { go func() { - err := netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { + err := netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error { if err := bw.Error(); err != nil { return err } @@ -184,29 +166,21 @@ func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques xb := exportBlockPool.Get().(*exportBlock) xb.mn = mn xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr) - writeCSVLine(xb) + if err := writeCSVLine(xb, workerID); err != nil { + return err + } xb.reset() exportBlockPool.Put(xb) return nil }) - close(resultsCh) doneCh <- err }() } - // Consume all the data from resultsCh. - for bb := range resultsCh { - // Do not check for error in bw.Write, since this error is checked inside netstorage.ExportBlocks above. - _, _ = bw.Write(bb.B) - quicktemplate.ReleaseByteBuffer(bb) - } - if err := bw.Flush(); err != nil { - return err - } err = <-doneCh if err != nil { return fmt.Errorf("error during sending the exported csv data to remote client: %w", err) } - return nil + return sw.flush() } var exportCSVDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export/csv"}`) @@ -224,6 +198,7 @@ func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Req w.Header().Set("Content-Type", "VictoriaMetrics/native") bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) + sw := newScalableWriter(bw) // Marshal tr trBuf := make([]byte, 0, 16) @@ -232,13 +207,13 @@ func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Req _, _ = bw.Write(trBuf) // Marshal native blocks. - err = netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { + err = netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error { if err := bw.Error(); err != nil { return err } - dstBuf := bbPool.Get() + bb := sw.getBuffer(workerID) + dst := bb.B tmpBuf := bbPool.Get() - dst := dstBuf.B tmp := tmpBuf.B // Marshal mn @@ -254,19 +229,13 @@ func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Req tmpBuf.B = tmp bbPool.Put(tmpBuf) - _, err := bw.Write(dst) - - dstBuf.B = dst - bbPool.Put(dstBuf) - return err + bb.B = dst + return sw.maybeFlushBuffer(bb) }) if err != nil { return fmt.Errorf("error during sending native data to remote client: %w", err) } - if err := bw.Flush(); err != nil { - return fmt.Errorf("error during flushing native data to remote client: %w", err) - } - return nil + return sw.flush() } var exportNativeDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export/native"}`) @@ -293,31 +262,48 @@ func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`) func exportHandler(qt *querytracer.Tracer, w http.ResponseWriter, cp *commonParams, format string, maxRowsPerLine int, reduceMemUsage bool) error { - writeResponseFunc := WriteExportStdResponse - writeLineFunc := func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { - bb := quicktemplate.AcquireByteBuffer() + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + sw := newScalableWriter(bw) + writeLineFunc := func(xb *exportBlock, workerID uint) error { + bb := sw.getBuffer(workerID) WriteExportJSONLine(bb, xb) - resultsCh <- bb + return sw.maybeFlushBuffer(bb) } contentType := "application/stream+json; charset=utf-8" if format == "prometheus" { contentType = "text/plain; charset=utf-8" - writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { - bb := quicktemplate.AcquireByteBuffer() + writeLineFunc = func(xb *exportBlock, workerID uint) error { + bb := sw.getBuffer(workerID) WriteExportPrometheusLine(bb, xb) - resultsCh <- bb + return sw.maybeFlushBuffer(bb) } } else if format == "promapi" { - writeResponseFunc = WriteExportPromAPIResponse - writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { - bb := quicktemplate.AcquireByteBuffer() + WriteExportPromAPIHeader(bw) + firstLineOnce := uint32(0) + firstLineSent := uint32(0) + writeLineFunc = func(xb *exportBlock, workerID uint) error { + bb := sw.getBuffer(workerID) + if atomic.CompareAndSwapUint32(&firstLineOnce, 0, 1) { + // Send the first line to sw.bw + WriteExportPromAPILine(bb, xb) + _, err := sw.bw.Write(bb.B) + bb.Reset() + atomic.StoreUint32(&firstLineSent, 1) + return err + } + for atomic.LoadUint32(&firstLineSent) == 0 { + // Busy wait until the first line is sent to sw.bw + runtime.Gosched() + } + bb.B = append(bb.B, ',') WriteExportPromAPILine(bb, xb) - resultsCh <- bb + return sw.maybeFlushBuffer(bb) } } if maxRowsPerLine > 0 { writeLineFuncOrig := writeLineFunc - writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { + writeLineFunc = func(xb *exportBlock, workerID uint) error { valuesOrig := xb.values timestampsOrig := xb.timestamps values := valuesOrig @@ -338,19 +324,19 @@ func exportHandler(qt *querytracer.Tracer, w http.ResponseWriter, cp *commonPara } xb.values = valuesChunk xb.timestamps = timestampsChunk - writeLineFuncOrig(xb, resultsCh) + if err := writeLineFuncOrig(xb, workerID); err != nil { + return err + } } xb.values = valuesOrig xb.timestamps = timestampsOrig + return nil } } sq := storage.NewSearchQuery(cp.start, cp.end, cp.filterss, *maxExportSeries) w.Header().Set("Content-Type", contentType) - bw := bufferedwriter.Get(w) - defer bufferedwriter.Put(bw) - resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs()) doneCh := make(chan error, 1) if !reduceMemUsage { rss, err := netstorage.ProcessSearchQuery(qt, sq, cp.deadline) @@ -367,19 +353,20 @@ func exportHandler(qt *querytracer.Tracer, w http.ResponseWriter, cp *commonPara xb.mn = &rs.MetricName xb.timestamps = rs.Timestamps xb.values = rs.Values - writeLineFunc(xb, resultsCh) + if err := writeLineFunc(xb, workerID); err != nil { + return err + } xb.reset() exportBlockPool.Put(xb) return nil }) qtChild.Done() - close(resultsCh) doneCh <- err }() } else { qtChild := qt.NewChild("background export format=%s", format) go func() { - err := netstorage.ExportBlocks(qtChild, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { + err := netstorage.ExportBlocks(qtChild, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error { if err := bw.Error(); err != nil { return err } @@ -390,26 +377,30 @@ func exportHandler(qt *querytracer.Tracer, w http.ResponseWriter, cp *commonPara xb.mn = mn xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr) if len(xb.timestamps) > 0 { - writeLineFunc(xb, resultsCh) + if err := writeLineFunc(xb, workerID); err != nil { + return err + } } xb.reset() exportBlockPool.Put(xb) return nil }) qtChild.Done() - close(resultsCh) doneCh <- err }() } - - // writeResponseFunc must consume all the data from resultsCh. - writeResponseFunc(bw, resultsCh, qt) - if err := bw.Flush(); err != nil { - return err - } err := <-doneCh if err != nil { - return fmt.Errorf("error during sending the data to remote client: %w", err) + return fmt.Errorf("cannot send data to remote client: %w", err) + } + if err := sw.flush(); err != nil { + return fmt.Errorf("cannot send data to remote client: %w", err) + } + if format == "promapi" { + WriteExportPromAPIFooter(bw, qt) + } + if err := bw.Flush(); err != nil { + return err } return nil } @@ -1137,3 +1128,41 @@ func getCommonParams(r *http.Request, startTime time.Time, requireNonEmptyMatch } return cp, nil } + +type scalableWriter struct { + bw *bufferedwriter.Writer + m sync.Map +} + +func newScalableWriter(bw *bufferedwriter.Writer) *scalableWriter { + return &scalableWriter{ + bw: bw, + } +} + +func (sw *scalableWriter) getBuffer(workerID uint) *bytesutil.ByteBuffer { + v, ok := sw.m.Load(workerID) + if !ok { + v = &bytesutil.ByteBuffer{} + sw.m.Store(workerID, v) + } + return v.(*bytesutil.ByteBuffer) +} + +func (sw *scalableWriter) maybeFlushBuffer(bb *bytesutil.ByteBuffer) error { + if len(bb.B) < 1024*1024 { + return nil + } + _, err := sw.bw.Write(bb.B) + bb.Reset() + return err +} + +func (sw *scalableWriter) flush() error { + sw.m.Range(func(k, v interface{}) bool { + bb := v.(*bytesutil.ByteBuffer) + _, err := sw.bw.Write(bb.B) + return err == nil + }) + return sw.bw.Flush() +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 6f3ece426..f828e66bc 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -27,6 +27,7 @@ See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#m * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): support specifying tenant ids via `vm_account_id` and `vm_project_id` labels. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2970). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling) performance by up to 3x for non-trivial `regex` values such as `([^:]+):.+`, which can be used for extracting a `host` part from `host:port` label value. * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): improve performance by up to 4x for queries containing non-trivial `regex` filters such as `{path=~"/foo/.+|/bar"}`. +* FEATURE: improve performance scalability on systems with many CPU cores for [/federate](https://docs.victoriametrics.com/#federation) and [/api/v1/export/...](https://docs.victoriametrics.com/#how-to-export-time-series) endpoints. * FEATURE: sanitize metric names for data ingested via [DataDog protocol](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) according to [DataDog metric naming](https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics). The behaviour can be disabled by passing `-datadog.sanitizeMetricName=false` command-line flag. Thanks to @PerGon for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3105). * FEATURE: add `-usePromCompatibleNaming` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent.html), to single-node VictoriaMetrics and to `vminsert` component of VictoriaMetrics cluster. This flag can be used for normalizing the ingested metric names and label names to [Prometheus-compatible form](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). If this flag is set, then all the chars unsupported by Prometheus are replaced with `_` chars in metric names and labels of the ingested samples. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113). * FEATURE: accept whitespace in metric names and tags ingested via [Graphite plaintext protocol](https://docs.victoriametrics.com/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) according to [the specs](https://graphite.readthedocs.io/en/latest/tags.html). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3102).