From 4318f3464480ab6e4707a68c32a46f41f7f2e9d2 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 20 Apr 2024 21:59:49 +0200 Subject: [PATCH] lib/protoparser: substitute hybrid channel-based pools with plain sync.Pool Using plain sync.Pool simplifies the code without increasing memory usage and CPU usage. So it is better to use plain sync.Pool from readability and maintainability PoV. This is a follow-up for 8942f290eb95719cae2eb9298d3df074fd65bde5 --- .../csvimport/stream/streamparser.go | 24 ++++------------ .../datadogsketches/stream/streamparser.go | 24 ++++------------ .../datadogv1/stream/streamparser.go | 24 ++++------------ .../datadogv2/stream/streamparser.go | 24 ++++------------ .../graphite/stream/streamparser.go | 24 ++++------------ lib/protoparser/influx/stream/streamparser.go | 24 ++++------------ .../newrelic/stream/streamparser.go | 28 ++++++------------- .../opentsdb/stream/streamparser.go | 24 ++++------------ .../opentsdbhttp/stream/streamparser.go | 24 ++++------------ .../prometheus/stream/streamparser.go | 24 ++++------------ .../promremotewrite/stream/streamparser.go | 28 +++++-------------- .../vmimport/stream/streamparser.go | 24 ++++------------ 12 files changed, 75 insertions(+), 221 deletions(-) diff --git a/lib/protoparser/csvimport/stream/streamparser.go b/lib/protoparser/csvimport/stream/streamparser.go index 02e0e8933c..1c9fc178f2 100644 --- a/lib/protoparser/csvimport/stream/streamparser.go +++ b/lib/protoparser/csvimport/stream/streamparser.go @@ -10,7 +10,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" @@ -121,33 +120,22 @@ func (ctx *streamContext) reset() { } func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows csvimport.Rows diff --git a/lib/protoparser/datadogsketches/stream/streamparser.go b/lib/protoparser/datadogsketches/stream/streamparser.go index c81cee1e5a..3aa287fcd8 100644 --- a/lib/protoparser/datadogsketches/stream/streamparser.go +++ b/lib/protoparser/datadogsketches/stream/streamparser.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches" @@ -104,33 +103,22 @@ var ( ) func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) ctx.br.Reset(r) return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), } } func putPushCtx(ctx *pushCtx) { ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } + pushCtxPool.Put(ctx) } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) func getRequest() *datadogsketches.SketchPayload { v := requestPool.Get() diff --git a/lib/protoparser/datadogv1/stream/streamparser.go b/lib/protoparser/datadogv1/stream/streamparser.go index 961645f04d..e8c99a3357 100644 --- a/lib/protoparser/datadogv1/stream/streamparser.go +++ b/lib/protoparser/datadogv1/stream/streamparser.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" @@ -104,33 +103,22 @@ var ( ) func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) ctx.br.Reset(r) return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), } } func putPushCtx(ctx *pushCtx) { ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } + pushCtxPool.Put(ctx) } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) func getRequest() *datadogv1.Request { v := requestPool.Get() diff --git a/lib/protoparser/datadogv2/stream/streamparser.go b/lib/protoparser/datadogv2/stream/streamparser.go index 0fe06269dc..7f41a1e933 100644 --- a/lib/protoparser/datadogv2/stream/streamparser.go +++ b/lib/protoparser/datadogv2/stream/streamparser.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils" @@ -111,33 +110,22 @@ var ( ) func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) ctx.br.Reset(r) return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), } } func putPushCtx(ctx *pushCtx) { ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } + pushCtxPool.Put(ctx) } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) func getRequest() *datadogv2.Request { v := requestPool.Get() diff --git a/lib/protoparser/graphite/stream/streamparser.go b/lib/protoparser/graphite/stream/streamparser.go index 6d8780718d..ef1dbfee78 100644 --- a/lib/protoparser/graphite/stream/streamparser.go +++ b/lib/protoparser/graphite/stream/streamparser.go @@ -9,7 +9,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" @@ -116,33 +115,22 @@ var ( ) func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows graphite.Rows diff --git a/lib/protoparser/influx/stream/streamparser.go b/lib/protoparser/influx/stream/streamparser.go index a01de56393..a1df19aa2e 100644 --- a/lib/protoparser/influx/stream/streamparser.go +++ b/lib/protoparser/influx/stream/streamparser.go @@ -9,7 +9,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" @@ -134,33 +133,22 @@ func (ctx *streamContext) reset() { } func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows influx.Rows diff --git a/lib/protoparser/newrelic/stream/streamparser.go b/lib/protoparser/newrelic/stream/streamparser.go index ee28878295..7a34eeaf2c 100644 --- a/lib/protoparser/newrelic/stream/streamparser.go +++ b/lib/protoparser/newrelic/stream/streamparser.go @@ -9,7 +9,6 @@ import ( "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -89,9 +88,6 @@ var ( unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="newrelic"}`) ) -var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) - type pushCtx struct { br *bufio.Reader reqBuf bytesutil.ByteBuffer @@ -119,27 +115,19 @@ func (ctx *pushCtx) reset() { } func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) ctx.br.Reset(r) return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), } } func putPushCtx(ctx *pushCtx) { ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } + pushCtxPool.Put(ctx) } + +var pushCtxPool sync.Pool diff --git a/lib/protoparser/opentsdb/stream/streamparser.go b/lib/protoparser/opentsdb/stream/streamparser.go index d7bdbea0e4..0a9e2c057f 100644 --- a/lib/protoparser/opentsdb/stream/streamparser.go +++ b/lib/protoparser/opentsdb/stream/streamparser.go @@ -9,7 +9,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" @@ -106,33 +105,22 @@ var ( ) func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows opentsdb.Rows diff --git a/lib/protoparser/opentsdbhttp/stream/streamparser.go b/lib/protoparser/opentsdbhttp/stream/streamparser.go index 03d5e4cb3c..47b9ee5548 100644 --- a/lib/protoparser/opentsdbhttp/stream/streamparser.go +++ b/lib/protoparser/opentsdbhttp/stream/streamparser.go @@ -10,7 +10,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -128,33 +127,22 @@ var ( ) func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) func getRows() *opentsdbhttp.Rows { v := rowsPool.Get() diff --git a/lib/protoparser/prometheus/stream/streamparser.go b/lib/protoparser/prometheus/stream/streamparser.go index 6463b2c3a9..cc3ff8446e 100644 --- a/lib/protoparser/prometheus/stream/streamparser.go +++ b/lib/protoparser/prometheus/stream/streamparser.go @@ -8,7 +8,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" @@ -117,33 +116,22 @@ var ( ) func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows prometheus.Rows diff --git a/lib/protoparser/promremotewrite/stream/streamparser.go b/lib/protoparser/promremotewrite/stream/streamparser.go index 546c8c0f25..33dd4efc42 100644 --- a/lib/protoparser/promremotewrite/stream/streamparser.go +++ b/lib/protoparser/promremotewrite/stream/streamparser.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -123,35 +122,22 @@ var ( ) func getPushCtx(r io.Reader) *pushCtx { - select { - case ctx := <-pushCtxPoolCh: + if v := pushCtxPool.Get(); v != nil { + ctx := v.(*pushCtx) ctx.br.Reset(r) return ctx - default: - if v := pushCtxPool.Get(); v != nil { - ctx := v.(*pushCtx) - ctx.br.Reset(r) - return ctx - } - return &pushCtx{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &pushCtx{ + br: bufio.NewReaderSize(r, 64*1024), } } func putPushCtx(ctx *pushCtx) { ctx.reset() - select { - case pushCtxPoolCh <- ctx: - default: - pushCtxPool.Put(ctx) - } + pushCtxPool.Put(ctx) } -var ( - pushCtxPool sync.Pool - pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) -) +var pushCtxPool sync.Pool func getWriteRequest() *prompb.WriteRequest { v := writeRequestPool.Get() diff --git a/lib/protoparser/vmimport/stream/streamparser.go b/lib/protoparser/vmimport/stream/streamparser.go index 23bcb5f8e4..5b4b829b21 100644 --- a/lib/protoparser/vmimport/stream/streamparser.go +++ b/lib/protoparser/vmimport/stream/streamparser.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" @@ -110,33 +109,22 @@ func (ctx *streamContext) reset() { } func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: + if v := streamContextPool.Get(); v != nil { + ctx := v.(*streamContext) ctx.br.Reset(r) return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } + } + return &streamContext{ + br: bufio.NewReaderSize(r, 64*1024), } } func putStreamContext(ctx *streamContext) { ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } + streamContextPool.Put(ctx) } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows vmimport.Rows