lib/protoparser: substitute hybrid channel-based pools with plain sync.Pool

Using plain sync.Pool simplifies the code without increasing memory usage and CPU usage.
So it is better to use plain sync.Pool from readability and maintainability PoV.

This is a follow-up for 8942f290eb
This commit is contained in:
Aliaksandr Valialkin 2024-04-20 21:59:49 +02:00
parent 8942f290eb
commit 4770294732
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
12 changed files with 75 additions and 221 deletions

View file

@ -10,7 +10,6 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
@ -121,33 +120,22 @@ func (ctx *streamContext) reset() {
}
func getStreamContext(r io.Reader) *streamContext {
select {
case ctx := <-streamContextPoolCh:
if v := streamContextPool.Get(); v != nil {
ctx := v.(*streamContext)
ctx.br.Reset(r)
return ctx
default:
if v := streamContextPool.Get(); v != nil {
ctx := v.(*streamContext)
ctx.br.Reset(r)
return ctx
}
return &streamContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
return &streamContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
func putStreamContext(ctx *streamContext) {
ctx.reset()
select {
case streamContextPoolCh <- ctx:
default:
streamContextPool.Put(ctx)
}
streamContextPool.Put(ctx)
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows csvimport.Rows

View file

@ -7,7 +7,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogsketches"
@ -104,33 +103,22 @@ var (
)
func getPushCtx(r io.Reader) *pushCtx {
select {
case ctx := <-pushCtxPoolCh:
if v := pushCtxPool.Get(); v != nil {
ctx := v.(*pushCtx)
ctx.br.Reset(r)
return ctx
default:
if v := pushCtxPool.Get(); v != nil {
ctx := v.(*pushCtx)
ctx.br.Reset(r)
return ctx
}
return &pushCtx{
br: bufio.NewReaderSize(r, 64*1024),
}
}
return &pushCtx{
br: bufio.NewReaderSize(r, 64*1024),
}
}
func putPushCtx(ctx *pushCtx) {
ctx.reset()
select {
case pushCtxPoolCh <- ctx:
default:
pushCtxPool.Put(ctx)
}
pushCtxPool.Put(ctx)
}
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
func getRequest() *datadogsketches.SketchPayload {
v := requestPool.Get()

View file

@ -7,7 +7,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
@ -104,33 +103,22 @@ var (
)
func getPushCtx(r io.Reader) *pushCtx {
select {
case ctx := <-pushCtxPoolCh:
if v := pushCtxPool.Get(); v != nil {
ctx := v.(*pushCtx)
ctx.br.Reset(r)
return ctx
default:
if v := pushCtxPool.Get(); v != nil {
ctx := v.(*pushCtx)
ctx.br.Reset(r)
return ctx
}
return &pushCtx{
br: bufio.NewReaderSize(r, 64*1024),
}
}
return &pushCtx{
br: bufio.NewReaderSize(r, 64*1024),
}
}
func putPushCtx(ctx *pushCtx) {
ctx.reset()
select {
case pushCtxPoolCh <- ctx:
default:
pushCtxPool.Put(ctx)
}
pushCtxPool.Put(ctx)
}
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
func getRequest() *datadogv1.Request {
v := requestPool.Get()

View file

@ -7,7 +7,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadogutils"
@ -111,33 +110,22 @@ var (
)
func getPushCtx(r io.Reader) *pushCtx {
select {
case ctx := <-pushCtxPoolCh:
if v := pushCtxPool.Get(); v != nil {
ctx := v.(*pushCtx)
ctx.br.Reset(r)
return ctx
default:
if v := pushCtxPool.Get(); v != nil {
ctx := v.(*pushCtx)
ctx.br.Reset(r)
return ctx
}
return &pushCtx{
br: bufio.NewReaderSize(r, 64*1024),
}
}
return &pushCtx{
br: bufio.NewReaderSize(r, 64*1024),
}
}
func putPushCtx(ctx *pushCtx) {
ctx.reset()
select {
case pushCtxPoolCh <- ctx:
default:
pushCtxPool.Put(ctx)
}
pushCtxPool.Put(ctx)
}
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
func getRequest() *datadogv2.Request {
v := requestPool.Get()

View file

@ -9,7 +9,6 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
@ -116,33 +115,22 @@ var (
)
func getStreamContext(r io.Reader) *streamContext {
select {
case ctx := <-streamContextPoolCh:
if v := streamContextPool.Get(); v != nil {
ctx := v.(*streamContext)
ctx.br.Reset(r)
return ctx
default:
if v := streamContextPool.Get(); v != nil {
ctx := v.(*streamContext)
ctx.br.Reset(r)
return ctx
}
return &streamContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
return &streamContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
func putStreamContext(ctx *streamContext) {
ctx.reset()
select {
case streamContextPoolCh <- ctx:
default:
streamContextPool.Put(ctx)
}
streamContextPool.Put(ctx)
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows graphite.Rows

View file

@ -9,7 +9,6 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
@ -134,33 +133,22 @@ func (ctx *streamContext) reset() {
}
func getStreamContext(r io.Reader) *streamContext {
select {
case ctx := <-streamContextPoolCh:
if v := streamContextPool.Get(); v != nil {
ctx := v.(*streamContext)
ctx.br.Reset(r)
return ctx
default:
if v := streamContextPool.Get(); v != nil {
ctx := v.(*streamContext)
ctx.br.Reset(r)
return ctx
}
return &streamContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
return &streamContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
func putStreamContext(ctx *streamContext) {
ctx.reset()
select {
case streamContextPoolCh <- ctx:
default:
streamContextPool.Put(ctx)
}
streamContextPool.Put(ctx)
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows influx.Rows

View file

@ -9,7 +9,6 @@ import (
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
@ -89,9 +88,6 @@ var (
unmarshalErrors = metrics.NewCounter(`vm_protoparser_unmarshal_errors_total{type="newrelic"}`)
)
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
type pushCtx struct {
br *bufio.Reader
reqBuf bytesutil.ByteBuffer
@ -119,27 +115,19 @@ func (ctx *pushCtx) reset() {
}
func getPushCtx(r io.Reader) *pushCtx {
select {
case ctx := <-pushCtxPoolCh:
if v := pushCtxPool.Get(); v != nil {
ctx := v.(*pushCtx)
ctx.br.Reset(r)
return ctx
default:
if v := pushCtxPool.Get(); v != nil {
ctx := v.(*pushCtx)
ctx.br.Reset(r)
return ctx
}
return &pushCtx{
br: bufio.NewReaderSize(r, 64*1024),
}
}
return &pushCtx{
br: bufio.NewReaderSize(r, 64*1024),
}
}
func putPushCtx(ctx *pushCtx) {
ctx.reset()
select {
case pushCtxPoolCh <- ctx:
default:
pushCtxPool.Put(ctx)
}
pushCtxPool.Put(ctx)
}
var pushCtxPool sync.Pool

View file

@ -9,7 +9,6 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb"
@ -106,33 +105,22 @@ var (
)
func getStreamContext(r io.Reader) *streamContext {
select {
case ctx := <-streamContextPoolCh:
if v := streamContextPool.Get(); v != nil {
ctx := v.(*streamContext)
ctx.br.Reset(r)
return ctx
default:
if v := streamContextPool.Get(); v != nil {
ctx := v.(*streamContext)
ctx.br.Reset(r)
return ctx
}
return &streamContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
return &streamContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
func putStreamContext(ctx *streamContext) {
ctx.reset()
select {
case streamContextPoolCh <- ctx:
default:
streamContextPool.Put(ctx)
}
streamContextPool.Put(ctx)
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows opentsdb.Rows

View file

@ -10,7 +10,6 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
@ -128,33 +127,22 @@ var (
)
func getStreamContext(r io.Reader) *streamContext {
select {
case ctx := <-streamContextPoolCh:
if v := streamContextPool.Get(); v != nil {
ctx := v.(*streamContext)
ctx.br.Reset(r)
return ctx
default:
if v := streamContextPool.Get(); v != nil {
ctx := v.(*streamContext)
ctx.br.Reset(r)
return ctx
}
return &streamContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
return &streamContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
func putStreamContext(ctx *streamContext) {
ctx.reset()
select {
case streamContextPoolCh <- ctx:
default:
streamContextPool.Put(ctx)
}
streamContextPool.Put(ctx)
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
func getRows() *opentsdbhttp.Rows {
v := rowsPool.Get()

View file

@ -8,7 +8,6 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
@ -117,33 +116,22 @@ var (
)
func getStreamContext(r io.Reader) *streamContext {
select {
case ctx := <-streamContextPoolCh:
if v := streamContextPool.Get(); v != nil {
ctx := v.(*streamContext)
ctx.br.Reset(r)
return ctx
default:
if v := streamContextPool.Get(); v != nil {
ctx := v.(*streamContext)
ctx.br.Reset(r)
return ctx
}
return &streamContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
return &streamContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
func putStreamContext(ctx *streamContext) {
ctx.reset()
select {
case streamContextPoolCh <- ctx:
default:
streamContextPool.Put(ctx)
}
streamContextPool.Put(ctx)
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows prometheus.Rows

View file

@ -7,7 +7,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@ -123,35 +122,22 @@ var (
)
func getPushCtx(r io.Reader) *pushCtx {
select {
case ctx := <-pushCtxPoolCh:
if v := pushCtxPool.Get(); v != nil {
ctx := v.(*pushCtx)
ctx.br.Reset(r)
return ctx
default:
if v := pushCtxPool.Get(); v != nil {
ctx := v.(*pushCtx)
ctx.br.Reset(r)
return ctx
}
return &pushCtx{
br: bufio.NewReaderSize(r, 64*1024),
}
}
return &pushCtx{
br: bufio.NewReaderSize(r, 64*1024),
}
}
func putPushCtx(ctx *pushCtx) {
ctx.reset()
select {
case pushCtxPoolCh <- ctx:
default:
pushCtxPool.Put(ctx)
}
pushCtxPool.Put(ctx)
}
var (
pushCtxPool sync.Pool
pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
)
var pushCtxPool sync.Pool
func getWriteRequest() *prompb.WriteRequest {
v := writeRequestPool.Get()

View file

@ -7,7 +7,6 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
@ -110,33 +109,22 @@ func (ctx *streamContext) reset() {
}
func getStreamContext(r io.Reader) *streamContext {
select {
case ctx := <-streamContextPoolCh:
if v := streamContextPool.Get(); v != nil {
ctx := v.(*streamContext)
ctx.br.Reset(r)
return ctx
default:
if v := streamContextPool.Get(); v != nil {
ctx := v.(*streamContext)
ctx.br.Reset(r)
return ctx
}
return &streamContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
return &streamContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
func putStreamContext(ctx *streamContext) {
ctx.reset()
select {
case streamContextPoolCh <- ctx:
default:
streamContextPool.Put(ctx)
}
streamContextPool.Put(ctx)
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows vmimport.Rows