From e17702fada132e001529f5f4c5c7b75caecdd9db Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@gmail.com>
Date: Tue, 10 Mar 2020 21:45:15 +0200
Subject: [PATCH] app/vmselect: add optional `max_rows_per_line` query arg to
 `/api/v1/export`

This arg allows limiting the number of data points that may be exported on a single line.
---
 README.md                             |  3 +++
 app/vmselect/prometheus/prometheus.go | 37 ++++++++++++++++++++++++---
 2 files changed, 37 insertions(+), 3 deletions(-)

diff --git a/README.md b/README.md
index 46e403e29..730ec68bd 100644
--- a/README.md
+++ b/README.md
@@ -629,6 +629,9 @@ Each JSON line would contain data for a single time series. An example output:
 Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either
 unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values.
 
+Optional `max_rows_per_line` arg may be added to the request in order to limit the maximum number of rows exported per each JSON line.
+By default each JSON line contains all the rows for a single time series.
+
 Pass `Accept-Encoding: gzip` HTTP header in the request to `/api/v1/export` in order to reduce network bandwidth during exporing big amounts
 of time series data. This enables gzip compression for the exported data. Example for exporting gzipped data:
 
diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go
index 91890a686..1d182b14c 100644
--- a/app/vmselect/prometheus/prometheus.go
+++ b/app/vmselect/prometheus/prometheus.go
@@ -3,6 +3,7 @@ package prometheus
 import (
 	"flag"
 	"fmt"
+	"io"
 	"math"
 	"net/http"
 	"runtime"
@@ -18,6 +19,7 @@ import (
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
 	"github.com/VictoriaMetrics/metrics"
+	"github.com/valyala/fastjson/fastfloat"
 	"github.com/valyala/quicktemplate"
 )
 
@@ -129,11 +131,12 @@ func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request)
 		return err
 	}
 	format := r.FormValue("format")
+	maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line")))
 	deadline := getDeadlineForExport(r)
 	if start >= end {
 		end = start + defaultStep
 	}
-	if err := exportHandler(w, matches, start, end, format, deadline); err != nil {
+	if err := exportHandler(w, matches, start, end, format, maxRowsPerLine, deadline); err != nil {
 		return fmt.Errorf("error when exporting data for queries=%q on the time range (start=%d, end=%d): %s", matches, start, end, err)
 	}
 	exportDuration.UpdateDuration(startTime)
@@ -142,9 +145,37 @@ func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request)
 
 var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`)
 
-func exportHandler(w http.ResponseWriter, matches []string, start, end int64, format string, deadline netstorage.Deadline) error {
+func exportHandler(w http.ResponseWriter, matches []string, start, end int64, format string, maxRowsPerLine int, deadline netstorage.Deadline) error {
 	writeResponseFunc := WriteExportStdResponse
 	writeLineFunc := WriteExportJSONLine
+	if maxRowsPerLine > 0 {
+		writeLineFunc = func(w io.Writer, rs *netstorage.Result) {
+			valuesOrig := rs.Values
+			timestampsOrig := rs.Timestamps
+			values := valuesOrig
+			timestamps := timestampsOrig
+			for len(values) > 0 {
+				var valuesChunk []float64
+				var timestampsChunk []int64
+				if len(values) > maxRowsPerLine {
+					valuesChunk = values[:maxRowsPerLine]
+					timestampsChunk = timestamps[:maxRowsPerLine]
+					values = values[maxRowsPerLine:]
+					timestamps = timestamps[maxRowsPerLine:]
+				} else {
+					valuesChunk = values
+					timestampsChunk = timestamps
+					values = nil
+					timestamps = nil
+				}
+				rs.Values = valuesChunk
+				rs.Timestamps = timestampsChunk
+				WriteExportJSONLine(w, rs)
+			}
+			rs.Values = valuesOrig
+			rs.Timestamps = timestampsOrig
+		}
+	}
 	contentType := "application/stream+json"
 	if format == "prometheus" {
 		contentType = "text/plain"
@@ -576,7 +607,7 @@ func QueryHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) e
 		start -= offset
 		end := start
 		start = end - window
-		if err := exportHandler(w, []string{childQuery}, start, end, "promapi", deadline); err != nil {
+		if err := exportHandler(w, []string{childQuery}, start, end, "promapi", 0, deadline); err != nil {
 			return fmt.Errorf("error when exporting data for query=%q on the time range (start=%d, end=%d): %s", childQuery, start, end, err)
 		}
 		queryDuration.UpdateDuration(startTime)