VictoriaMetrics/app/vminsert/clusternative/request_handler.go
Aliaksandr Valialkin b275983403
lib/writeconcurrencylimiter: improve the logic behind -maxConcurrentInserts limit
Previously the -maxConcurrentInserts was limiting the number of established client connections,
which write data to VictoriaMetrics. Some of these connections could be idle.
Such connections do not consume big amounts of CPU and RAM, so there is a little sense in limiting
the number of such connections. So now the -maxConcurrentInserts command-line option
limits the number of concurrently executed insert requests, not including idle connections.

It is recommended removing -maxConcurrentInserts command-line option, since the default value
for this option should work good for most cases.
2023-01-06 22:07:16 -08:00

75 lines
2.4 KiB
Go

package clusternative
import (
"fmt"
"net"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/clusternative"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="clusternative"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="clusternative"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="clusternative"}`)
)
// InsertHandler processes data from vminsert nodes.
func InsertHandler(c net.Conn) error {
bc, err := handshake.VMInsertServer(c, 0)
if err != nil {
return fmt.Errorf("cannot perform vminsert handshake with client %q: %w", c.RemoteAddr(), err)
}
return parser.ParseStream(bc, func(rows []storage.MetricRow) error {
return insertRows(rows)
}, nil)
}
func insertRows(rows []storage.MetricRow) error {
ctx := netstorage.GetInsertCtx()
defer netstorage.PutInsertCtx(ctx)
ctx.Reset() // This line is required for initializing ctx internals.
hasRelabeling := relabel.HasRelabeling()
var at auth.Token
var rowsPerTenant *metrics.Counter
var mn storage.MetricName
for i := range rows {
mr := &rows[i]
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot unmarshal MetricNameRaw: %w", err)
}
if rowsPerTenant == nil || mn.AccountID != at.AccountID || mn.ProjectID != at.ProjectID {
at.AccountID = mn.AccountID
at.ProjectID = mn.ProjectID
rowsPerTenant = rowsTenantInserted.Get(&at)
}
ctx.Labels = ctx.Labels[:0]
ctx.AddLabelBytes(nil, mn.MetricGroup)
for j := range mn.Tags {
tag := &mn.Tags[j]
ctx.AddLabelBytes(tag.Key, tag.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
if err := ctx.WriteDataPoint(&at, ctx.Labels, mr.Timestamp, mr.Value); err != nil {
return err
}
rowsPerTenant.Inc()
}
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return ctx.FlushBufs()
}