app/vminsert: properly ingest influx metrics

Commit 71bb9fc0d0 introduced a regression.
If labels are empty and relabeling is not configured, influx ingestion hanlder
performed an earlier exit due to TryPrepareLabels call.
 Due micro-optimisations for this procotol, this check was not valid.
Since it didn't take in account metircName, which added later and skip metrics line.

 This commit removes `TryPrepareLabel` function call from this path and inline it instead.
It properly track empty labels path.

 Adds initial tests implementation for data ingestion protocols.

 Related issue:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7933

Signed-off-by: f41gh7 <nik@victoriametrics.com>
This commit is contained in:
f41gh7 2024-12-26 11:59:11 +01:00
parent e0b2c1c4f5
commit fc336bbf20
No known key found for this signature in database
GPG key ID: 4558311CF775EC72
7 changed files with 363 additions and 7 deletions

View file

@ -105,7 +105,7 @@ func (ctx *InsertCtx) TryPrepareLabels(hasRelabeling bool) bool {
if timeserieslimits.Enabled() && timeserieslimits.IsExceeding(ctx.Labels) {
return false
}
ctx.sortLabelsIfNeeded()
ctx.SortLabelsIfNeeded()
return true
}

View file

@ -12,8 +12,8 @@ var sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for inco
`For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}. `+
`Enabled sorting for labels can slow down ingestion performance a bit`)
// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set
func (ctx *InsertCtx) sortLabelsIfNeeded() {
// SortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set
func (ctx *InsertCtx) SortLabelsIfNeeded() {
if *sortLabels {
sort.Sort(&ctx.Labels)
}

View file

@ -118,8 +118,14 @@ func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label)
}
}
} else {
if !ic.TryPrepareLabels(false) {
continue
// special case for optimisations below
// do not call TryPrepareLabels
// manually apply sort and limits on demand
ic.SortLabelsIfNeeded()
if hasLimitsEnabled {
if timeserieslimits.IsExceeding(ic.Labels) {
continue
}
}
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels)
labelsLen := len(ic.Labels)
@ -132,8 +138,6 @@ func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label)
ic.Labels = ic.Labels[:labelsLen]
ic.AddLabel("", metricGroup)
if hasLimitsEnabled {
// special case for optimisation above
// check only __name__ label value limits
if timeserieslimits.IsExceeding(ic.Labels[len(ic.Labels)-1:]) {
continue
}

View file

@ -5,6 +5,7 @@ import (
"fmt"
"net/url"
"slices"
"sort"
"strconv"
"strings"
"testing"
@ -103,6 +104,25 @@ func NewPrometheusAPIV1QueryResponse(t *testing.T, s string) *PrometheusAPIV1Que
return res
}
// Sort performs data.Result sort by metric labels
func (pqr *PrometheusAPIV1QueryResponse) Sort() {
sort.Slice(pqr.Data.Result, func(i, j int) bool {
leftS := make([]string, 0, len(pqr.Data.Result[i].Metric))
rightS := make([]string, 0, len(pqr.Data.Result[j].Metric))
for k, v := range pqr.Data.Result[i].Metric {
leftS = append(leftS, fmt.Sprintf("%s=%s", k, v))
}
for k, v := range pqr.Data.Result[j].Metric {
rightS = append(rightS, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(leftS)
sort.Strings(rightS)
return strings.Join(leftS, ",") < strings.Join(rightS, ",")
})
}
// QueryData holds the query result along with its type.
type QueryData struct {
ResultType string

View file

@ -0,0 +1,318 @@
package tests
import (
"os"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
at "github.com/VictoriaMetrics/VictoriaMetrics/apptest"
pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestSingleIngestionProtocols(t *testing.T) {
os.RemoveAll(t.Name())
tc := at.NewTestCase(t)
defer tc.Stop()
sut := tc.MustStartDefaultVmsingle()
type opts struct {
query string
wantMetrics []map[string]string
wantSamples []*at.Sample
}
f := func(sut at.PrometheusQuerier, opts *opts) {
t.Helper()
wantResult := []*at.QueryResult{}
for idx, wm := range opts.wantMetrics {
wantResult = append(wantResult, &at.QueryResult{
Metric: wm,
Samples: []*at.Sample{opts.wantSamples[idx]},
})
}
tc.Assert(&at.AssertOptions{
Msg: "unexpected /export query response",
Got: func() any {
got := sut.PrometheusAPIV1Export(t, opts.query, at.QueryOpts{
Start: "2024-02-05T08:50:00.700Z",
End: "2024-02-05T09:00:00.700Z",
})
got.Sort()
return got
},
Want: &at.PrometheusAPIV1QueryResponse{Data: &at.QueryData{Result: wantResult}},
CmpOpts: []cmp.Option{
cmpopts.IgnoreFields(at.PrometheusAPIV1QueryResponse{}, "Status", "Data.ResultType"),
},
})
}
// influx line format
sut.InfluxWrite(t, []string{
`influxline series1=10 1707123456700`, // 2024-02-05T08:57:36.700Z
`influxline,label=foo1,label1=value1,label2=value2 series2=40 1707123456800`, // 2024-02-05T08:57:36.800Z
}, at.QueryOpts{})
sut.ForceFlush(t)
f(sut, &opts{
query: `{__name__=~"influxline.+"}`,
wantMetrics: []map[string]string{
{
"__name__": "influxline_series1",
},
{
"__name__": "influxline_series2",
"label": "foo1",
"label1": "value1",
"label2": "value2",
},
},
wantSamples: []*at.Sample{
{Timestamp: 1707123456700, Value: 10},
{Timestamp: 1707123456800, Value: 40},
},
})
// prometheus text exposition format
sut.PrometheusAPIV1ImportPrometheus(t, []string{
`importprometheus_series 10 1707123456700`, // 2024-02-05T08:57:36.700Z
`importprometheus_series2{label="foo",label1="value1"} 20 1707123456800`, // 2024-02-05T08:57:36.800Z
}, at.QueryOpts{})
sut.ForceFlush(t)
f(sut, &opts{
query: `{__name__=~"importprometheus.+"}`,
wantMetrics: []map[string]string{
{
"__name__": "importprometheus_series",
},
{
"__name__": "importprometheus_series2",
"label": "foo",
"label1": "value1",
},
},
wantSamples: []*at.Sample{
{Timestamp: 1707123456700, Value: 10},
{Timestamp: 1707123456800, Value: 20},
},
})
// prometheus remote write format
pbData := []pb.TimeSeries{
{
Labels: []pb.Label{
{
Name: "__name__",
Value: "prometheusrw_series",
},
},
Samples: []pb.Sample{
{
Value: 10,
Timestamp: 1707123456700, // 2024-02-05T08:57:36.700Z
},
},
},
{
Labels: []pb.Label{
{
Name: "__name__",
Value: "prometheusrw_series2",
},
{
Name: "label",
Value: "foo2",
},
{
Name: "label1",
Value: "value1",
},
},
Samples: []pb.Sample{
{
Value: 20,
Timestamp: 1707123456800, // 2024-02-05T08:57:36.800Z
},
},
},
}
sut.PrometheusAPIV1Write(t, pbData, at.QueryOpts{})
sut.ForceFlush(t)
f(sut, &opts{
query: `{__name__=~"prometheusrw.+"}`,
wantMetrics: []map[string]string{
{
"__name__": "prometheusrw_series",
},
{
"__name__": "prometheusrw_series2",
"label": "foo2",
"label1": "value1",
},
},
wantSamples: []*at.Sample{
{Timestamp: 1707123456700, Value: 10}, // 2024-02-05T08:57:36.700Z
{Timestamp: 1707123456800, Value: 20}, // 2024-02-05T08:57:36.700Z
},
})
}
func TestClusterIngestionProtocols(t *testing.T) {
os.RemoveAll(t.Name())
tc := at.NewTestCase(t)
defer tc.Stop()
vmstorage := tc.MustStartVmstorage("vmstorage", []string{
"-storageDataPath=" + tc.Dir() + "/vmstorage",
"-retentionPeriod=100y",
})
vminsert := tc.MustStartVminsert("vminsert", []string{
"-storageNode=" + vmstorage.VminsertAddr(),
})
vmselect := tc.MustStartVmselect("vmselect", []string{
"-storageNode=" + vmstorage.VmselectAddr(),
})
type opts struct {
query string
wantMetrics []map[string]string
wantSamples []*at.Sample
}
f := func(opts *opts) {
t.Helper()
wantResult := []*at.QueryResult{}
for idx, wm := range opts.wantMetrics {
wantResult = append(wantResult, &at.QueryResult{
Metric: wm,
Samples: []*at.Sample{opts.wantSamples[idx]},
})
}
tc.Assert(&at.AssertOptions{
Msg: "unexpected /export query response",
Got: func() any {
got := vmselect.PrometheusAPIV1Export(t, opts.query, at.QueryOpts{
Start: "2024-02-05T08:50:00.700Z",
End: "2024-02-05T09:00:00.700Z",
})
got.Sort()
return got
},
Want: &at.PrometheusAPIV1QueryResponse{Data: &at.QueryData{Result: wantResult}},
CmpOpts: []cmp.Option{
cmpopts.IgnoreFields(at.PrometheusAPIV1QueryResponse{}, "Status", "Data.ResultType"),
},
})
}
// prometheus text exposition format
vminsert.PrometheusAPIV1ImportPrometheus(t, []string{
`importprometheus_series 10 1707123456700`, // 2024-02-05T08:57:36.700Z
`importprometheus_series2{label="foo",label1="value1"} 20 1707123456800`, // 2024-02-05T08:57:36.800Z
}, at.QueryOpts{})
vmstorage.ForceFlush(t)
f(&opts{
query: `{__name__=~"importprometheus.+"}`,
wantMetrics: []map[string]string{
{
"__name__": "importprometheus_series",
},
{
"__name__": "importprometheus_series2",
"label": "foo",
"label1": "value1",
},
},
wantSamples: []*at.Sample{
{Timestamp: 1707123456700, Value: 10},
{Timestamp: 1707123456800, Value: 20},
},
})
// influx line format
vminsert.InfluxWrite(t, []string{
`influxline series1=10 1707123456700`, // 2024-02-05T08:57:36.700Z
`influxline,label=foo1,label1=value1,label2=value2 series2=40 1707123456800`, // 2024-02-05T08:57:36.800Z
}, at.QueryOpts{})
vmstorage.ForceFlush(t)
f(&opts{
query: `{__name__=~"influxline.+"}`,
wantMetrics: []map[string]string{
{
"__name__": "influxline_series1",
},
{
"__name__": "influxline_series2",
"label": "foo1",
"label1": "value1",
"label2": "value2",
},
},
wantSamples: []*at.Sample{
{Timestamp: 1707123456700, Value: 10},
{Timestamp: 1707123456800, Value: 40},
},
})
// prometheus remote write format
pbData := []pb.TimeSeries{
{
Labels: []pb.Label{
{
Name: "__name__",
Value: "prometheusrw_series",
},
},
Samples: []pb.Sample{
{
Value: 10,
Timestamp: 1707123456700, // 2024-02-05T08:57:36.700Z
},
},
},
{
Labels: []pb.Label{
{
Name: "__name__",
Value: "prometheusrw_series2",
},
{
Name: "label",
Value: "foo2",
},
{
Name: "label1",
Value: "value1",
},
},
Samples: []pb.Sample{
{
Value: 20,
Timestamp: 1707123456800, // 2024-02-05T08:57:36.800Z
},
},
},
}
vminsert.PrometheusAPIV1Write(t, pbData, at.QueryOpts{})
vmstorage.ForceFlush(t)
f(&opts{
query: `{__name__=~"prometheusrw.+"}`,
wantMetrics: []map[string]string{
{
"__name__": "prometheusrw_series",
},
{
"__name__": "prometheusrw_series2",
"label": "foo2",
"label1": "value1",
},
},
wantSamples: []*at.Sample{
{Timestamp: 1707123456700, Value: 10}, // 2024-02-05T08:57:36.700Z
{Timestamp: 1707123456800, Value: 20}, // 2024-02-05T08:57:36.700Z
},
})
}

View file

@ -78,6 +78,19 @@ func (app *Vminsert) ClusternativeListenAddr() string {
return app.clusternativeListenAddr
}
// InfluxWrite is a test helper function that inserts a
// collection of records in Influx line format by sending a HTTP
// POST request to /influx/write vmsingle endpoint.
//
// See https://docs.victoriametrics.com/url-examples/#influxwrite
func (app *Vminsert) InfluxWrite(t *testing.T, records []string, opts QueryOpts) {
t.Helper()
url := fmt.Sprintf("http://%s/insert/%s/influx/write", app.httpListenAddr, opts.getTenant())
data := []byte(strings.Join(records, "\n"))
app.cli.Post(t, url, "text/plain", data, http.StatusNoContent)
}
// PrometheusAPIV1Write is a test helper function that inserts a
// collection of records in Prometheus remote-write format by sending a HTTP
// POST request to /prometheus/api/v1/write vminsert endpoint.

View file

@ -26,6 +26,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [vminsert](https://docs.victoriametrics.com/vminsert/): Storage nodes defined in `-storageNode` are now sorted, ensuring that varying node orders across different vminsert instances do not result in inconsistent replication.
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly set `host` field at debug information formatted with `dump_request_on_errors: true` setting.
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/single-server-victoriametrics/) and `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): properly ingest `influx` line protocol metrics with empty tags. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7933) for details.
## [v1.108.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.108.1)