diff --git a/lib/protoparser/csvimport/stream/streamparser.go b/lib/protoparser/csvimport/stream/streamparser.go index 02e0e8933..1c9fc178f 100644 --- a/lib/protoparser/csvimport/stream/streamparser.go +++ b/lib/protoparser/csvimport/stream/streamparser.go @@ -10,7 +10,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" @@ -121,33 +120,22 @@ func (ctx *streamContext) reset() { } func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows csvimport.Rows diff --git a/lib/protoparser/datadogsketches/stream/streamparser.go b/lib/protoparser/datadogsketches/stream/streamparser.go index c81cee1e5..3aa287fcd 100644 --- a/lib/protoparser/datadogsketches/stream/streamparser.go +++ b/lib/protoparser/datadogsketches/stream/streamparser.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches" @@ -104,33 +103,22 @@ var ( ) func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) ctx.br.Reset(r) return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), } } func putPushCtx(ctx *pushCtx) { ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } + pushCtxPool.Put(ctx) } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) func getRequest() *datadogsketches.SketchPayload { v := requestPool.Get() diff --git a/lib/protoparser/datadogv1/stream/streamparser.go b/lib/protoparser/datadogv1/stream/streamparser.go index 961645f04..e8c99a335 100644 --- a/lib/protoparser/datadogv1/stream/streamparser.go +++ b/lib/protoparser/datadogv1/stream/streamparser.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" @@ -104,33 +103,22 @@ var ( ) func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) ctx.br.Reset(r) return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), } } func putPushCtx(ctx *pushCtx) { ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } + pushCtxPool.Put(ctx) } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) func getRequest() *datadogv1.Request { v := requestPool.Get() diff --git a/lib/protoparser/datadogv2/stream/streamparser.go b/lib/protoparser/datadogv2/stream/streamparser.go index 0fe06269d..7f41a1e93 100644 --- a/lib/protoparser/datadogv2/stream/streamparser.go +++ b/lib/protoparser/datadogv2/stream/streamparser.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" @@ -111,33 +110,22 @@ var ( ) func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) ctx.br.Reset(r) return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), } } func putPushCtx(ctx *pushCtx) { ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } + pushCtxPool.Put(ctx) } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) func getRequest() *datadogv2.Request { v := requestPool.Get() diff --git a/lib/protoparser/graphite/stream/streamparser.go b/lib/protoparser/graphite/stream/streamparser.go index 6d8780718..ef1dbfee7 100644 --- a/lib/protoparser/graphite/stream/streamparser.go +++ b/lib/protoparser/graphite/stream/streamparser.go @@ -9,7 +9,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" @@ -116,33 +115,22 @@ var ( ) func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows graphite.Rows diff --git a/lib/protoparser/influx/stream/streamparser.go b/lib/protoparser/influx/stream/streamparser.go index a01de5639..a1df19aa2 100644 --- a/lib/protoparser/influx/stream/streamparser.go +++ b/lib/protoparser/influx/stream/streamparser.go @@ -9,7 +9,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" @@ -134,33 +133,22 @@ func (ctx *streamContext) reset() { } func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows influx.Rows diff --git a/lib/protoparser/newrelic/stream/streamparser.go b/lib/protoparser/newrelic/stream/streamparser.go index ee2887829..7a34eeaf2 100644 --- a/lib/protoparser/newrelic/stream/streamparser.go +++ b/lib/protoparser/newrelic/stream/streamparser.go @@ -9,7 +9,6 @@ import ( "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -89,9 +88,6 @@ var ( unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="newrelic"}`) ) -var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) - type pushCtx struct { br *bufio.Reader reqBuf bytesutil.ByteBuffer @@ -119,27 +115,19 @@ func (ctx *pushCtx) reset() { } func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) ctx.br.Reset(r) return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), } } func putPushCtx(ctx *pushCtx) { ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } + pushCtxPool.Put(ctx) } + +var pushCtxPool sync.Pool diff --git a/lib/protoparser/opentsdb/stream/streamparser.go b/lib/protoparser/opentsdb/stream/streamparser.go index d7bdbea0e..0a9e2c057 100644 --- a/lib/protoparser/opentsdb/stream/streamparser.go +++ b/lib/protoparser/opentsdb/stream/streamparser.go @@ -9,7 +9,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" @@ -106,33 +105,22 @@ var ( ) func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows opentsdb.Rows diff --git a/lib/protoparser/opentsdbhttp/stream/streamparser.go b/lib/protoparser/opentsdbhttp/stream/streamparser.go index 03d5e4cb3..47b9ee554 100644 --- a/lib/protoparser/opentsdbhttp/stream/streamparser.go +++ b/lib/protoparser/opentsdbhttp/stream/streamparser.go @@ -10,7 +10,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -128,33 +127,22 @@ var ( ) func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) func getRows() *opentsdbhttp.Rows { v := rowsPool.Get() diff --git a/lib/protoparser/prometheus/stream/streamparser.go b/lib/protoparser/prometheus/stream/streamparser.go index 6463b2c3a..cc3ff8446 100644 --- a/lib/protoparser/prometheus/stream/streamparser.go +++ b/lib/protoparser/prometheus/stream/streamparser.go @@ -8,7 +8,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" @@ -117,33 +116,22 @@ var ( ) func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows prometheus.Rows diff --git a/lib/protoparser/promremotewrite/stream/streamparser.go b/lib/protoparser/promremotewrite/stream/streamparser.go index 546c8c0f2..33dd4efc4 100644 --- a/lib/protoparser/promremotewrite/stream/streamparser.go +++ b/lib/protoparser/promremotewrite/stream/streamparser.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -123,35 +122,22 @@ var ( ) func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) ctx.br.Reset(r) return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), } } func putPushCtx(ctx *pushCtx) { ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } + pushCtxPool.Put(ctx) } -var ( - pushCtxPool sync.Pool - pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) -) +var pushCtxPool sync.Pool func getWriteRequest() *prompb.WriteRequest { v := writeRequestPool.Get() diff --git a/lib/protoparser/vmimport/stream/streamparser.go b/lib/protoparser/vmimport/stream/streamparser.go index 23bcb5f8e..5b4b829b2 100644 --- a/lib/protoparser/vmimport/stream/streamparser.go +++ b/lib/protoparser/vmimport/stream/streamparser.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" @@ -110,33 +109,22 @@ func (ctx *streamContext) reset() { } func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows vmimport.Rows