diff --git a/lib/mergeset/encoding.go b/lib/mergeset/encoding.go index a4935d4e6..f59bdc58d 100644 --- a/lib/mergeset/encoding.go +++ b/lib/mergeset/encoding.go @@ -11,7 +11,6 @@ import ( "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -571,26 +570,3 @@ func getLensBuffer(n int) *lensBuffer { func putLensBuffer(lb *lensBuffer) { lensBufferPool.Put(lb) } - -func getInmemoryBlock() *inmemoryBlock { - select { - case ib := <-ibPoolCh: - return ib - default: - return &inmemoryBlock{} - } -} - -func putInmemoryBlock(ib *inmemoryBlock) { - ib.Reset() - select { - case ibPoolCh <- ib: - default: - // drop ib in order to reduce memory usage on systems with big number of CPU cores - } -} - -// Every inmemoryBlock struct occupies at least 64KB of memory, e.g. quite big amounts of memory. -// Use a chan instead of sync.Pool in order to reduce memory usage on systems -// with big number of CPU cores. -var ibPoolCh = make(chan *inmemoryBlock, 100*cgroup.AvailableCPUs()) diff --git a/lib/mergeset/part_search.go b/lib/mergeset/part_search.go index 8ee302a3c..29a0a6723 100644 --- a/lib/mergeset/part_search.go +++ b/lib/mergeset/part_search.go @@ -325,7 +325,7 @@ func (ps *partSearch) readInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error) ps.sb.lensData = bytesutil.ResizeNoCopyMayOverallocate(ps.sb.lensData, int(bh.lensBlockSize)) ps.p.lensFile.MustReadAt(ps.sb.lensData, int64(bh.lensBlockOffset)) - ib := getInmemoryBlock() + ib := &inmemoryBlock{} if err := ib.UnmarshalData(&ps.sb, bh.firstItem, bh.commonPrefix, bh.itemsCount, bh.marshalType); err != nil { return nil, fmt.Errorf("cannot unmarshal storage block with %d items: %w", bh.itemsCount, err) } diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index b4e64dbe4..5c692a5ff 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -233,7 +233,7 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { ris.mu.Lock() ibs := ris.ibs if len(ibs) == 0 { - ib := getInmemoryBlock() + ib := &nmemoryBlock{} ibs = append(ibs, ib) ris.ibs = ibs } @@ -249,12 +249,11 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { atomic.StoreUint64(&ris.lastFlushTime, fasttime.UnixTimestamp()) break } - ib = getInmemoryBlock() + ib = &nmemoryBlock{} if ib.Add(item) { ibs = append(ibs, ib) continue } - putInmemoryBlock(ib) logger.Panicf("BUG: cannot insert too big item into an empty inmemoryBlock len(item)=%d; the caller should be responsible for avoiding too big items", len(item)) } ris.ibs = ibs @@ -882,7 +881,6 @@ func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { } bsr := getBlockStreamReader() bsr.MustInitFromInmemoryBlock(ib) - putInmemoryBlock(ib) bsrs = append(bsrs, bsr) } if len(bsrs) == 0 {