From 47702947326ceadd51887aad1b354657c785e10f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 20 Apr 2024 21:59:49 +0200 Subject: [PATCH] lib/protoparser: substitute hybrid channel-based pools with plain sync.Pool Using plain sync.Pool simplifies the code without increasing memory usage and CPU usage. So it is better to use plain sync.Pool from readability and maintainability PoV. This is a follow-up for 8942f290eb95719cae2eb9298d3df074fd65bde5 --- .../csvimport/stream/streamparser.go | 24 ++++------------ .../datadogsketches/stream/streamparser.go | 24 ++++------------ .../datadogv1/stream/streamparser.go | 24 ++++------------ .../datadogv2/stream/streamparser.go | 24 ++++------------ .../graphite/stream/streamparser.go | 24 ++++------------ lib/protoparser/influx/stream/streamparser.go | 24 ++++------------ .../newrelic/stream/streamparser.go | 28 ++++++------------- .../opentsdb/stream/streamparser.go | 24 ++++------------ .../opentsdbhttp/stream/streamparser.go | 24 ++++------------ .../prometheus/stream/streamparser.go | 24 ++++------------ .../promremotewrite/stream/streamparser.go | 28 +++++-------------- .../vmimport/stream/streamparser.go | 24 ++++------------ 12 files changed, 75 insertions(+), 221 deletions(-) diff --git a/lib/protoparser/csvimport/stream/streamparser.go b/lib/protoparser/csvimport/stream/streamparser.go index 02e0e8933..1c9fc178f 100644 --- a/lib/protoparser/csvimport/stream/streamparser.go +++ b/lib/protoparser/csvimport/stream/streamparser.go @@ -10,7 +10,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" @@ -121,33 +120,22 @@ func (ctx *streamContext) reset() { } func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows csvimport.Rows diff --git a/lib/protoparser/datadogsketches/stream/streamparser.go b/lib/protoparser/datadogsketches/stream/streamparser.go index c81cee1e5..3aa287fcd 100644 --- a/lib/protoparser/datadogsketches/stream/streamparser.go +++ b/lib/protoparser/datadogsketches/stream/streamparser.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches" @@ -104,33 +103,22 @@ var ( ) func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) ctx.br.Reset(r) return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), } } func putPushCtx(ctx *pushCtx) { ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } + pushCtxPool.Put(ctx) } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) func getRequest() *datadogsketches.SketchPayload { v := requestPool.Get() diff --git a/lib/protoparser/datadogv1/stream/streamparser.go b/lib/protoparser/datadogv1/stream/streamparser.go index 961645f04..e8c99a335 100644 --- a/lib/protoparser/datadogv1/stream/streamparser.go +++ b/lib/protoparser/datadogv1/stream/streamparser.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" @@ -104,33 +103,22 @@ var ( ) func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) ctx.br.Reset(r) return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), } } func putPushCtx(ctx *pushCtx) { ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } + pushCtxPool.Put(ctx) } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) func getRequest() *datadogv1.Request { v := requestPool.Get() diff --git a/lib/protoparser/datadogv2/stream/streamparser.go b/lib/protoparser/datadogv2/stream/streamparser.go index 0fe06269d..7f41a1e93 100644 --- a/lib/protoparser/datadogv2/stream/streamparser.go +++ b/lib/protoparser/datadogv2/stream/streamparser.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" @@ -111,33 +110,22 @@ var ( ) func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) ctx.br.Reset(r) return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), } } func putPushCtx(ctx *pushCtx) { ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } + pushCtxPool.Put(ctx) } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) func getRequest() *datadogv2.Request { v := requestPool.Get() diff --git a/lib/protoparser/graphite/stream/streamparser.go b/lib/protoparser/graphite/stream/streamparser.go index 6d8780718..ef1dbfee7 100644 --- a/lib/protoparser/graphite/stream/streamparser.go +++ b/lib/protoparser/graphite/stream/streamparser.go @@ -9,7 +9,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" @@ -116,33 +115,22 @@ var ( ) func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows graphite.Rows diff --git a/lib/protoparser/influx/stream/streamparser.go b/lib/protoparser/influx/stream/streamparser.go index a01de5639..a1df19aa2 100644 --- a/lib/protoparser/influx/stream/streamparser.go +++ b/lib/protoparser/influx/stream/streamparser.go @@ -9,7 +9,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" @@ -134,33 +133,22 @@ func (ctx *streamContext) reset() { } func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows influx.Rows diff --git a/lib/protoparser/newrelic/stream/streamparser.go b/lib/protoparser/newrelic/stream/streamparser.go index ee2887829..7a34eeaf2 100644 --- a/lib/protoparser/newrelic/stream/streamparser.go +++ b/lib/protoparser/newrelic/stream/streamparser.go @@ -9,7 +9,6 @@ import ( "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -89,9 +88,6 @@ var ( unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="newrelic"}`) ) -var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) - type pushCtx struct { br *bufio.Reader reqBuf bytesutil.ByteBuffer @@ -119,27 +115,19 @@ func (ctx *pushCtx) reset() { } func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) ctx.br.Reset(r) return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), } } func putPushCtx(ctx *pushCtx) { ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } + pushCtxPool.Put(ctx) } + +var pushCtxPool sync.Pool diff --git a/lib/protoparser/opentsdb/stream/streamparser.go b/lib/protoparser/opentsdb/stream/streamparser.go index d7bdbea0e..0a9e2c057 100644 --- a/lib/protoparser/opentsdb/stream/streamparser.go +++ b/lib/protoparser/opentsdb/stream/streamparser.go @@ -9,7 +9,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" @@ -106,33 +105,22 @@ var ( ) func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows opentsdb.Rows diff --git a/lib/protoparser/opentsdbhttp/stream/streamparser.go b/lib/protoparser/opentsdbhttp/stream/streamparser.go index 03d5e4cb3..47b9ee554 100644 --- a/lib/protoparser/opentsdbhttp/stream/streamparser.go +++ b/lib/protoparser/opentsdbhttp/stream/streamparser.go @@ -10,7 +10,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -128,33 +127,22 @@ var ( ) func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) func getRows() *opentsdbhttp.Rows { v := rowsPool.Get() diff --git a/lib/protoparser/prometheus/stream/streamparser.go b/lib/protoparser/prometheus/stream/streamparser.go index 6463b2c3a..cc3ff8446 100644 --- a/lib/protoparser/prometheus/stream/streamparser.go +++ b/lib/protoparser/prometheus/stream/streamparser.go @@ -8,7 +8,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" @@ -117,33 +116,22 @@ var ( ) func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows prometheus.Rows diff --git a/lib/protoparser/promremotewrite/stream/streamparser.go b/lib/protoparser/promremotewrite/stream/streamparser.go index 546c8c0f2..33dd4efc4 100644 --- a/lib/protoparser/promremotewrite/stream/streamparser.go +++ b/lib/protoparser/promremotewrite/stream/streamparser.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -123,35 +122,22 @@ var ( ) func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) ctx.br.Reset(r) return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), } } func putPushCtx(ctx *pushCtx) { ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } + pushCtxPool.Put(ctx) } -var ( - pushCtxPool sync.Pool - pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) -) +var pushCtxPool sync.Pool func getWriteRequest() *prompb.WriteRequest { v := writeRequestPool.Get() diff --git a/lib/protoparser/vmimport/stream/streamparser.go b/lib/protoparser/vmimport/stream/streamparser.go index 23bcb5f8e..5b4b829b2 100644 --- a/lib/protoparser/vmimport/stream/streamparser.go +++ b/lib/protoparser/vmimport/stream/streamparser.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" @@ -110,33 +109,22 @@ func (ctx *streamContext) reset() { } func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows vmimport.Rows