From d9f075622bc76bfe64ccc14ef95698a9cf09225e Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 17 Dec 2021 10:56:03 +0200 Subject: [PATCH] app/vmselect: de-duplicate data exported via `/api/v1/export/csv` by default Previously the exported data wasn't de-duplicated. Now it is possible to export the raw data without deduplication by passing reduce_mem_usage=1 query arg to /api/v1/export/csv See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1837 --- app/vmselect/prometheus/prometheus.go | 82 +++++++++++++++++++-------- docs/CHANGELOG.md | 1 + docs/README.md | 2 +- docs/Single-server-VictoriaMetrics.md | 2 +- 4 files changed, 60 insertions(+), 27 deletions(-) diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 4191d5b2e..dd118d20f 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -136,6 +136,7 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter if err != nil { return err } + reduceMemUsage := searchutils.GetBool(r, "reduce_mem_usage") deadline := searchutils.GetDeadlineForExport(r, startTime) tagFilterss, err := getTagFilterssFromRequest(r) if err != nil { @@ -147,30 +148,61 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter defer bufferedwriter.Put(bw) resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs()) - doneCh := make(chan error) - go func() { - err := netstorage.ExportBlocks(at, sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { - if err := bw.Error(); err != nil { - return err - } - if err := b.UnmarshalData(); err != nil { - return fmt.Errorf("cannot unmarshal block during export: %s", err) - } - xb := exportBlockPool.Get().(*exportBlock) - xb.mn = mn - xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr) - if len(xb.timestamps) > 0 { - bb := quicktemplate.AcquireByteBuffer() - WriteExportCSVLine(bb, xb, fieldNames) - resultsCh <- bb - } - xb.reset() - exportBlockPool.Put(xb) - return nil - }) - close(resultsCh) - doneCh <- err - }() + writeCSVLine := func(xb *exportBlock) { + if len(xb.timestamps) == 0 { + return + } + bb := quicktemplate.AcquireByteBuffer() + WriteExportCSVLine(bb, xb, fieldNames) + resultsCh <- bb + } + doneCh := make(chan error, 1) + if !reduceMemUsage { + // Unconditionally deny partial response for the exported data, + // since users usually expect that the exported data is full. + denyPartialResponse := true + rss, err := netstorage.ProcessSearchQuery(at, denyPartialResponse, sq, true, deadline) + if err != nil { + return fmt.Errorf("cannot fetch data for %q: %w", sq, err) + } + go func() { + err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error { + if err := bw.Error(); err != nil { + return err + } + xb := exportBlockPool.Get().(*exportBlock) + xb.mn = &rs.MetricName + xb.timestamps = rs.Timestamps + xb.values = rs.Values + writeCSVLine(xb) + xb.reset() + exportBlockPool.Put(xb) + return nil + }) + close(resultsCh) + doneCh <- err + }() + } else { + go func() { + err := netstorage.ExportBlocks(at, sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { + if err := bw.Error(); err != nil { + return err + } + if err := b.UnmarshalData(); err != nil { + return fmt.Errorf("cannot unmarshal block during export: %s", err) + } + xb := exportBlockPool.Get().(*exportBlock) + xb.mn = mn + xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr) + writeCSVLine(xb) + xb.reset() + exportBlockPool.Put(xb) + return nil + }) + close(resultsCh) + doneCh <- err + }() + } // Consume all the data from resultsCh. for bb := range resultsCh { // Do not check for error in bw.Write, since this error is checked inside netstorage.ExportBlocks above. @@ -367,7 +399,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match defer bufferedwriter.Put(bw) resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs()) - doneCh := make(chan error) + doneCh := make(chan error, 1) if !reduceMemUsage { // Unconditionally deny partial response for the exported data, // since users usually expect that the exported data is full. diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 0b00f76e7..0956c478c 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -24,6 +24,7 @@ sort: 15 * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix navigation over query history with `Ctrl+up/down` and fix zoom relatively to the cursor position. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1936). * BUGFIX: deduplicate samples more thoroughly if [deduplication](https://docs.victoriametrics.com/#deduplication) is enabled. Previously some duplicate samples may be left on disk for time series with high churn rate. This may result in bigger storage space requirements. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): follow up to 5 redirects when `follow_redirects: true` is set for a particular scrape config. Previously only a single redirect was performed in this case. It is expected these redirects are performed to the original hostname. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1945). +* BUGFIX: de-duplicate data exported via [/api/v1/export/csv](https://docs.victoriametrics.com/#how-to-export-csv-data) by default if [deduplication](https://docs.victoriametrics.com/#deduplication) is enabled. The de-duplication can be disabled by passing `reduce_mem_usage=1` query arg to `/api/v1/export/csv`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1837). ## [v1.70.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.70.0) diff --git a/docs/README.md b/docs/README.md index d95445f07..5a444567c 100644 --- a/docs/README.md +++ b/docs/README.md @@ -837,7 +837,7 @@ unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) val The exported CSV data can be imported to VictoriaMetrics via [/api/v1/import/csv](#how-to-import-csv-data). -The [deduplication](#deduplication) isn't applied for the data exported in CSV. It is expected that the de-duplication is performed during data import. +The [deduplication](#deduplication) is applied for the data exported in CSV by default. It is possible to export raw data without de-duplication by passing `reduce_mem_usage=1` query arg to `/api/v1/export/csv`. ### How to export data in native format diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 5a90eac51..148728d92 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -841,7 +841,7 @@ unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) val The exported CSV data can be imported to VictoriaMetrics via [/api/v1/import/csv](#how-to-import-csv-data). -The [deduplication](#deduplication) isn't applied for the data exported in CSV. It is expected that the de-duplication is performed during data import. +The [deduplication](#deduplication) is applied for the data exported in CSV by default. It is possible to export raw data without de-duplication by passing `reduce_mem_usage=1` query arg to `/api/v1/export/csv`. ### How to export data in native format