VictoriaMetrics/app/vminsert/influx/request_handler.go

120 lines
3.3 KiB
Go
Raw Normal View History

2019-05-22 21:16:55 +00:00
package influx
import (
"flag"
2019-05-22 21:16:55 +00:00
"net/http"
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/metrics"
)
var (
measurementFieldSeparator = flag.String("influxMeasurementFieldSeparator", "_", "Separator for '{measurement}{separator}{field_name}' metric name when inserted via Influx line protocol")
skipSingleField = flag.Bool("influxSkipSingleField", false, "Uses '{measurement}' instead of '{measurement}{separator}{field_name}' for metic name if Influx line contains only a single field")
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="influx"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="influx"}`)
)
2019-05-22 21:16:55 +00:00
// InsertHandler processes remote write for influx line protocol.
//
// See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md
func InsertHandler(req *http.Request) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, insertRows)
2019-05-22 21:16:55 +00:00
})
}
func insertRows(db string, rows []parser.Row) error {
2019-05-22 21:16:55 +00:00
ctx := getPushCtx()
defer putPushCtx(ctx)
rowsLen := 0
for i := range rows {
rowsLen += len(rows[i].Fields)
2019-05-22 21:16:55 +00:00
}
ic := &ctx.Common
ic.Reset(rowsLen)
rowsTotal := 0
2019-05-22 21:16:55 +00:00
for i := range rows {
r := &rows[i]
ic.Labels = ic.Labels[:0]
hasDBLabel := false
2019-05-22 21:16:55 +00:00
for j := range r.Tags {
tag := &r.Tags[j]
if tag.Key == "db" {
hasDBLabel = true
}
2019-05-22 21:16:55 +00:00
ic.AddLabel(tag.Key, tag.Value)
}
if len(db) > 0 && !hasDBLabel {
ic.AddLabel("db", db)
}
2019-05-22 21:16:55 +00:00
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels)
ctx.metricGroupBuf = append(ctx.metricGroupBuf[:0], r.Measurement...)
skipFieldKey := len(r.Fields) == 1 && *skipSingleField
if len(ctx.metricGroupBuf) > 0 && !skipFieldKey {
ctx.metricGroupBuf = append(ctx.metricGroupBuf, *measurementFieldSeparator...)
}
2019-05-22 21:16:55 +00:00
metricGroupPrefixLen := len(ctx.metricGroupBuf)
for j := range r.Fields {
f := &r.Fields[j]
if !skipFieldKey {
ctx.metricGroupBuf = append(ctx.metricGroupBuf[:metricGroupPrefixLen], f.Key...)
}
2019-05-22 21:16:55 +00:00
metricGroup := bytesutil.ToUnsafeString(ctx.metricGroupBuf)
ic.Labels = ic.Labels[:0]
ic.AddLabel("", metricGroup)
ic.WriteDataPoint(ctx.metricNameBuf, ic.Labels[:1], r.Timestamp, f.Value)
}
rowsTotal += len(r.Fields)
2019-05-22 21:16:55 +00:00
}
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
2019-05-22 21:16:55 +00:00
return ic.FlushBufs()
}
type pushCtx struct {
Common common.InsertCtx
2019-05-22 21:16:55 +00:00
metricNameBuf []byte
metricGroupBuf []byte
}
func (ctx *pushCtx) reset() {
ctx.Common.Reset(0)
ctx.metricNameBuf = ctx.metricNameBuf[:0]
ctx.metricGroupBuf = ctx.metricGroupBuf[:0]
}
func getPushCtx() *pushCtx {
select {
case ctx := <-pushCtxPoolCh:
return ctx
default:
if v := pushCtxPool.Get(); v != nil {
return v.(*pushCtx)
}
return &pushCtx{}
}
}
func putPushCtx(ctx *pushCtx) {
ctx.reset()
select {
case pushCtxPoolCh <- ctx:
default:
pushCtxPool.Put(ctx)
}
}
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1))