VictoriaMetrics/app/vminsert/opentsdb/request_handler.go
Aliaksandr Valialkin dc9eafcd02 app/{vminsert,vmagent}: add -sortLabels command-line option for sorting time series labels before ingesting them in the storage
This option can be useful when samples for the same time series are ingested with distinct order of labels.
For example, metric{k1="v1",k2="v2"} and metric{k2="v2",k1="v1"}.
2021-03-31 23:27:58 +03:00

56 lines
1.5 KiB
Go

package opentsdb
import (
"io"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="opentsdb"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="opentsdb"}`)
)
// InsertHandler processes remote write for OpenTSDB put protocol.
//
// See http://opentsdb.net/docs/build/html/api_telnet/put.html
func InsertHandler(r io.Reader) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, insertRows)
})
}
func insertRows(rows []parser.Row) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)
ctx.Reset(len(rows))
hasRelabeling := relabel.HasRelabeling()
for i := range rows {
r := &rows[i]
ctx.Labels = ctx.Labels[:0]
ctx.AddLabel("", r.Metric)
for j := range r.Tags {
tag := &r.Tags[j]
ctx.AddLabel(tag.Key, tag.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil {
return err
}
}
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return ctx.FlushBufs()
}